Introduce StartedNode (#1491)

This commit is contained in:
Andrzej Cichocki
2017-09-13 17:34:52 +01:00
committed by GitHub
parent c8b1eb5a1e
commit cfd6739d23
54 changed files with 468 additions and 383 deletions

View File

@ -16,7 +16,7 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
@ -139,7 +139,7 @@ class BFTNotaryServiceTests {
}
}
private fun AbstractNode.signInitialTransaction(
private fun StartedNode<*>.signInitialTransaction(
notary: Party,
block: TransactionBuilder.() -> Any?
): SignedTransaction {

View File

@ -11,7 +11,7 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.contracts.DUMMY_PROGRAM_ID
import net.corda.testing.contracts.DummyContract
@ -58,7 +58,7 @@ class RaftNotaryServiceTests : NodeBasedTest() {
assertEquals(error.txId, secondSpendTx.id)
}
private fun issueState(node: AbstractNode, notary: Party): StateAndRef<*> {
private fun issueState(node: StartedNode<*>, notary: Party): StateAndRef<*> {
return node.database.transaction {
val builder = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0))
val stx = node.services.signInitialTransaction(builder)

View File

@ -19,7 +19,7 @@ class FlowVersioningTest : NodeBasedTest() {
val (alice, bob) = listOf(
startNode(ALICE.name, platformVersion = 2),
startNode(BOB.name, platformVersion = 3)).transpose().getOrThrow()
bob.installCoreFlow(PretendInitiatingCoreFlow::class, ::PretendInitiatedCoreFlow)
bob.internals.installCoreFlow(PretendInitiatingCoreFlow::class, ::PretendInitiatedCoreFlow)
val (alicePlatformVersionAccordingToBob, bobPlatformVersionAccordingToAlice) = alice.services.startFlow(
PretendInitiatingCoreFlow(bob.info.legalIdentity)).resultFuture.getOrThrow()
assertThat(alicePlatformVersionAccordingToBob).isEqualTo(2)

View File

@ -28,7 +28,7 @@ import java.nio.file.Files
*/
class MQSecurityAsNodeTest : MQSecurityTest() {
override fun createAttacker(): SimpleMQClient {
return clientTo(alice.configuration.p2pAddress)
return clientTo(alice.internals.configuration.p2pAddress)
}
override fun startAttacker(attacker: SimpleMQClient) {
@ -42,7 +42,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
@Test
fun `only the node running the broker can login using the special node user`() {
val attacker = clientTo(alice.configuration.p2pAddress)
val attacker = clientTo(alice.internals.configuration.p2pAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start(NODE_USER, NODE_USER)
}
@ -50,7 +50,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
@Test
fun `login as the default cluster user`() {
val attacker = clientTo(alice.configuration.p2pAddress)
val attacker = clientTo(alice.internals.configuration.p2pAddress)
assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy {
attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword())
}
@ -58,7 +58,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
@Test
fun `login without a username and password`() {
val attacker = clientTo(alice.configuration.p2pAddress)
val attacker = clientTo(alice.internals.configuration.p2pAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start()
}
@ -66,7 +66,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
@Test
fun `login to a non ssl port as a node user`() {
val attacker = clientTo(alice.configuration.rpcAddress!!, sslConfiguration = null)
val attacker = clientTo(alice.internals.configuration.rpcAddress!!, sslConfiguration = null)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start(NODE_USER, NODE_USER, enableSSL = false)
}
@ -74,7 +74,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
@Test
fun `login to a non ssl port as a peer user`() {
val attacker = clientTo(alice.configuration.rpcAddress!!, sslConfiguration = null)
val attacker = clientTo(alice.internals.configuration.rpcAddress!!, sslConfiguration = null)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start(PEER_USER, PEER_USER, enableSSL = false) // Login as a peer
}
@ -128,7 +128,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
}
}
val attacker = clientTo(alice.configuration.p2pAddress, sslConfig)
val attacker = clientTo(alice.internals.configuration.p2pAddress, sslConfig)
assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
attacker.start(PEER_USER, PEER_USER)

View File

@ -12,7 +12,7 @@ import org.junit.Test
*/
class MQSecurityAsRPCTest : MQSecurityTest() {
override fun createAttacker(): SimpleMQClient {
return clientTo(alice.configuration.rpcAddress!!)
return clientTo(alice.internals.configuration.rpcAddress!!)
}
@Test
@ -30,7 +30,7 @@ class MQSecurityAsRPCTest : MQSecurityTest() {
@Test
fun `login to a ssl port as a RPC user`() {
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
loginToRPC(alice.configuration.p2pAddress, extraRPCUsers[0], configureTestSSL())
loginToRPC(alice.internals.configuration.p2pAddress, extraRPCUsers[0], configureTestSSL())
}
}
}

View File

@ -16,6 +16,7 @@ import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.core.utilities.unwrap
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
@ -43,7 +44,7 @@ import kotlin.test.assertEquals
*/
abstract class MQSecurityTest : NodeBasedTest() {
val rpcUser = User("user1", "pass", permissions = emptySet())
lateinit var alice: Node
lateinit var alice: StartedNode<Node>
lateinit var attacker: SimpleMQClient
private val clients = ArrayList<SimpleMQClient>()
@ -155,9 +156,9 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
fun loginToRPCAndGetClientQueue(): String {
loginToRPC(alice.configuration.rpcAddress!!, rpcUser)
loginToRPC(alice.internals.configuration.rpcAddress!!, rpcUser)
val clientQueueQuery = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*")
val client = clientTo(alice.configuration.rpcAddress!!)
val client = clientTo(alice.internals.configuration.rpcAddress!!)
client.start(rpcUser.username, rpcUser.password, false)
return client.session.addressQuery(clientQueueQuery).queueNames.single().toString()
}
@ -217,7 +218,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
private fun startBobAndCommunicateWithAlice(): Party {
val bob = startNode(BOB.name).getOrThrow()
bob.registerInitiatedFlow(ReceiveFlow::class.java)
bob.internals.registerInitiatedFlow(ReceiveFlow::class.java)
val bobParty = bob.info.legalIdentity
// Perform a protocol exchange to force the peer queue to be created
alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow()

View File

@ -14,7 +14,7 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.SimpleNotaryService
@ -149,7 +149,7 @@ class P2PMessagingTest : NodeBasedTest() {
// Wait until the first request is received
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// Stop alice's node after we ensured that the first request was delivered and ignored.
alice.stop()
alice.dispose()
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
@ -174,7 +174,7 @@ class P2PMessagingTest : NodeBasedTest() {
* either ignore them or respond, depending on the value of [CrashingNodes.ignoreRequests], initially set to true.
* This may be used to simulate scenarios where nodes receive request messages but crash before sending back a response.
*/
private fun simulateCrashingNodes(distributedServiceNodes: List<Node>, dummyTopic: String, responseMessage: String): CrashingNodes {
private fun simulateCrashingNodes(distributedServiceNodes: List<StartedNode<*>>, dummyTopic: String, responseMessage: String): CrashingNodes {
val crashingNodes = CrashingNodes(
requestsReceived = AtomicInteger(0),
firstRequestReceived = CountDownLatch(1),
@ -203,7 +203,7 @@ class P2PMessagingTest : NodeBasedTest() {
return crashingNodes
}
private fun assertAllNodesAreUsed(participatingServiceNodes: List<Node>, serviceName: CordaX500Name, originatingNode: Node) {
private fun assertAllNodesAreUsed(participatingServiceNodes: List<StartedNode<*>>, serviceName: CordaX500Name, originatingNode: StartedNode<*>) {
// Setup each node in the distributed service to return back it's NodeInfo so that we can know which node is being used
participatingServiceNodes.forEach { node ->
node.respondWith(node.info)
@ -221,10 +221,10 @@ class P2PMessagingTest : NodeBasedTest() {
break
}
}
assertThat(participatingNodes).containsOnlyElementsOf(participatingServiceNodes.map(Node::info))
assertThat(participatingNodes).containsOnlyElementsOf(participatingServiceNodes.map(StartedNode<*>::info))
}
private fun Node.respondWith(message: Any) {
private fun StartedNode<*>.respondWith(message: Any) {
network.addMessageHandler(javaClass.name) { netMessage, _ ->
val request = netMessage.data.deserialize<TestRequest>()
val response = network.createMessage(javaClass.name, request.sessionID, message.serialize().bytes)
@ -232,7 +232,7 @@ class P2PMessagingTest : NodeBasedTest() {
}
}
private fun Node.receiveFrom(target: MessageRecipients): CordaFuture<Any> {
private fun StartedNode<*>.receiveFrom(target: MessageRecipients): CordaFuture<Any> {
val request = TestRequest(replyTo = network.myAddress)
return network.sendRequest<Any>(javaClass.name, request, target)
}

View File

@ -33,7 +33,7 @@ class P2PSecurityTest : NodeBasedTest() {
val incorrectNetworkMapName = getX500Name(O = "NetworkMap-${random63BitValue()}", L = "London", C = "GB")
val node = startNode(BOB.name, configOverrides = mapOf(
"networkMapService" to mapOf(
"address" to networkMapNode.configuration.p2pAddress.toString(),
"address" to networkMapNode.internals.configuration.p2pAddress.toString(),
"legalName" to incorrectNetworkMapName.toString()
)
))
@ -59,7 +59,7 @@ class P2PSecurityTest : NodeBasedTest() {
val config = testNodeConfiguration(
baseDirectory = baseDirectory(legalName.x500Name),
myLegalName = legalName).also {
whenever(it.networkMapService).thenReturn(NetworkMapInfo(networkMapNode.configuration.p2pAddress, networkMapNode.info.legalIdentity.name))
whenever(it.networkMapService).thenReturn(NetworkMapInfo(networkMapNode.internals.configuration.p2pAddress, networkMapNode.info.legalIdentity.name))
}
config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name
return SimpleNode(config, trustRoot = trustRoot).apply { start() }

View File

@ -45,12 +45,9 @@ import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.*
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
import net.corda.node.services.network.NetworkMapService.RegistrationResponse
import net.corda.node.services.network.NodeRegistration
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.network.PersistentNetworkMapService
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
@ -104,6 +101,18 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val advertisedServices: Set<ServiceInfo>,
val platformClock: Clock,
@VisibleForTesting val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
private class StartedNodeImpl<out N : AbstractNode>(
override val internals: N,
override val services: ServiceHubInternalImpl,
override val info: NodeInfo,
override val checkpointStorage: CheckpointStorage,
override val smm: StateMachineManager,
override val attachments: NodeAttachmentService,
override val inNodeNetworkMapService: NetworkMapService,
override val network: MessagingService,
override val database: CordaPersistence,
override val rpcOps: CordaRPCOps) : StartedNode<N>
// TODO: Persist this, as well as whether the node is registered.
/**
* Sequence number of changes sent to the network map service, when registering/de-registering this node.
@ -122,17 +131,16 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
protected val partyKeys = mutableSetOf<KeyPair>()
val services: ServiceHubInternal get() = _services
protected val services: ServiceHubInternal get() = _services
private lateinit var _services: ServiceHubInternalImpl
lateinit var info: NodeInfo
lateinit var checkpointStorage: CheckpointStorage
lateinit var smm: StateMachineManager
lateinit var attachments: NodeAttachmentService
var inNodeNetworkMapService: NetworkMapService? = null
lateinit var network: MessagingService
protected lateinit var info: NodeInfo
protected lateinit var checkpointStorage: CheckpointStorage
protected lateinit var smm: StateMachineManager
protected lateinit var attachments: NodeAttachmentService
protected lateinit var inNodeNetworkMapService: NetworkMapService
protected lateinit var network: MessagingService
protected val runOnStop = ArrayList<() -> Any?>()
lateinit var database: CordaPersistence
protected lateinit var database: CordaPersistence
protected var dbCloser: (() -> Any?)? = null
protected val _nodeReadyFuture = openFuture<Unit>()
@ -160,17 +168,17 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
cordappLoader.cordapps.flatMap { it.plugins } + DefaultWhitelist()
}
/** Set to true once [start] has been successfully called. */
@Volatile
var started = false
private set
/** Set to non-null once [start] has been successfully called. */
open val started get() = _started
@Volatile private var _started: StartedNode<AbstractNode>? = null
/** The implementation of the [CordaRPCOps] interface used by this node. */
open val rpcOps: CordaRPCOps by lazy { CordaRPCOpsImpl(services, smm, database) } // Lazy to avoid init ordering issue with the SMM.
open fun start() {
require(!started) { "Node has already been started" }
open fun makeRPCOps(): CordaRPCOps {
return CordaRPCOpsImpl(services, smm, database)
}
open fun start(): StartedNode<AbstractNode> {
require(started == null) { "Node has already been started" }
if (configuration.devMode) {
log.warn("Corda node is running in dev mode.")
configuration.configureWithDevSSLCertificate()
@ -180,7 +188,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
log.info("Node starting up ...")
// Do all of this in a database transaction so anything that might need a connection has one.
initialiseDatabasePersistence {
val startedImpl = initialiseDatabasePersistence {
val tokenizableServices = makeServices()
smm = StateMachineManager(services,
@ -202,6 +210,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
makeVaultObservers()
val rpcOps = makeRPCOps()
startMessagingService(rpcOps)
installCoreFlows()
@ -211,16 +220,19 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
registerCustomSchemas(cordappLoader.cordapps.flatMap { it.customSchemas }.toSet())
runOnStop += network::stop
StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, inNodeNetworkMapService, network, database, rpcOps)
}
// If we successfully loaded network data from database, we set this future to Unit.
_nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured())
database.transaction {
smm.start()
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
_services.schedulerService.start()
return startedImpl.apply {
database.transaction {
smm.start()
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
services.schedulerService.start()
}
_started = this
}
started = true
}
private class ServiceInstantiationException(cause: Throwable?) : Exception(cause)
@ -409,7 +421,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// Specific class so that MockNode can catch it.
class DatabaseConfigurationException(msg: String) : Exception(msg)
protected open fun initialiseDatabasePersistence(insideTransaction: () -> Unit) {
protected open fun <T> initialiseDatabasePersistence(insideTransaction: () -> T): T {
val props = configuration.dataSourceProperties
if (props.isNotEmpty()) {
this.database = configureDatabase(props, configuration.database, { _services.schemaService }, createIdentityService = { _services.identityService })
@ -421,7 +433,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
dbCloser = it
runOnStop += it
}
database.transaction {
return database.transaction {
insideTransaction()
}
} else {
@ -431,7 +443,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun makeAdvertisedServices(tokenizableServices: MutableList<Any>) {
val serviceTypes = info.advertisedServices.map { it.info.type }
if (NetworkMapService.type in serviceTypes) makeNetworkMapService()
inNodeNetworkMapService = if (NetworkMapService.type in serviceTypes) makeNetworkMapService() else NullNetworkMapService
val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() }
if (notaryServiceType != null) {
val service = makeCoreNotaryService(notaryServiceType)
@ -452,7 +464,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun registerWithNetworkMapIfConfigured(): CordaFuture<Unit> {
services.networkMapCache.addNode(info)
// In the unit test environment, we may sometimes run without any network map service
return if (networkMapAddress == null && inNodeNetworkMapService == null) {
return if (networkMapAddress == null && inNodeNetworkMapService == NullNetworkMapService) {
services.networkMapCache.runWithoutMapService()
noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache
} else {
@ -513,8 +525,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
return PersistentKeyManagementService(identityService, partyKeys)
}
open protected fun makeNetworkMapService() {
inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion)
open protected fun makeNetworkMapService(): NetworkMapService {
return PersistentNetworkMapService(services, configuration.minimumPlatformVersion)
}
open protected fun makeCoreNotaryService(type: ServiceType): NotaryService? {

View File

@ -8,6 +8,7 @@ import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.RPCOps
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
@ -279,7 +280,7 @@ open class Node(override val configuration: FullNodeConfiguration,
* This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details
* on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url
*/
override fun initialiseDatabasePersistence(insideTransaction: () -> Unit) {
override fun <T> initialiseDatabasePersistence(insideTransaction: () -> T): T {
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
val h2Prefix = "jdbc:h2:file:"
if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) {
@ -296,25 +297,24 @@ open class Node(override val configuration: FullNodeConfiguration,
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
}
}
super.initialiseDatabasePersistence(insideTransaction)
return super.initialiseDatabasePersistence(insideTransaction)
}
private val _startupComplete = openFuture<Unit>()
val startupComplete: CordaFuture<Unit> get() = _startupComplete
override fun start() {
override fun start(): StartedNode<Node> {
if (initialiseSerialization) {
initialiseSerialization()
}
super.start()
val started: StartedNode<Node> = uncheckedCast(super.start())
nodeReadyFuture.thenMatch({
serverThread.execute {
// Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia:
//
// https://jolokia.org/agent/jvm.html
JmxReporter.
forRegistry(services.monitoringService.metrics).
forRegistry(started.services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { _, domain, name ->
// Make the JMX hierarchy a bit better organised.
@ -336,6 +336,7 @@ open class Node(override val configuration: FullNodeConfiguration,
shutdownHook = addShutdownHook {
stop()
}
return started
}
private fun initialiseSerialization() {

View File

@ -92,18 +92,16 @@ open class NodeStartup(val args: Array<String>) {
open protected fun startNode(conf: FullNodeConfiguration, versionInfo: VersionInfo, startTime: Long, cmdlineOptions: CmdLineOptions) {
val advertisedServices = conf.calculateServices()
val node = createNode(conf, versionInfo, advertisedServices)
node.start()
printPluginsAndServices(node)
node.nodeReadyFuture.thenMatch({
val node = createNode(conf, versionInfo, advertisedServices).start()
printPluginsAndServices(node.internals)
node.internals.nodeReadyFuture.thenMatch({
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
val name = node.info.legalIdentity.name.organisation
Node.printBasicNodeInfo("Node for \"$name\" started up and registered in $elapsed sec")
// Don't start the shell if there's no console attached.
val runShell = !cmdlineOptions.noLocalShell && System.console() != null
node.startupComplete.then {
node.internals.startupComplete.then {
try {
InteractiveShell.startShell(cmdlineOptions.baseDirectory, runShell, cmdlineOptions.sshdServer, node)
} catch(e: Throwable) {
@ -114,7 +112,7 @@ open class NodeStartup(val args: Array<String>) {
{
th -> logger.error("Unexpected exception during registration", th)
})
node.run()
node.internals.run()
}
open protected fun logStartupInfo(versionInfo: VersionInfo, cmdlineOptions: CmdLineOptions, conf: FullNodeConfiguration) {

View File

@ -0,0 +1,25 @@
package net.corda.node.internal
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
interface StartedNode<out N : AbstractNode> {
val internals: N
val services: ServiceHubInternal
val info: NodeInfo
val checkpointStorage: CheckpointStorage
val smm: StateMachineManager
val attachments: NodeAttachmentService
val inNodeNetworkMapService: NetworkMapService
val network: MessagingService
val database: CordaPersistence
val rpcOps: CordaRPCOps
fun dispose() = internals.stop()
}

View File

@ -114,6 +114,8 @@ interface NetworkMapService {
data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients)
}
object NullNetworkMapService : NetworkMapService
@ThreadSafe
class InMemoryNetworkMapService(services: ServiceHubInternal, minimumPlatformVersion: Int)
: AbstractNetworkMapService(services, minimumPlatformVersion) {

View File

@ -24,6 +24,7 @@ import net.corda.core.utilities.loggerFor
import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcContext
import net.corda.node.services.statemachine.FlowStateMachineImpl
@ -75,13 +76,13 @@ import kotlin.concurrent.thread
object InteractiveShell {
private val log = loggerFor<InteractiveShell>()
private lateinit var node: Node
private lateinit var node: StartedNode<Node>
/**
* Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node
* internals.
*/
fun startShell(dir: Path, runLocalShell: Boolean, runSSHServer: Boolean, node: Node) {
fun startShell(dir: Path, runLocalShell: Boolean, runSSHServer: Boolean, node: StartedNode<Node>) {
this.node = node
var runSSH = runSSHServer
@ -136,7 +137,7 @@ object InteractiveShell {
jlineProcessor.closed()
log.info("Command shell has exited")
terminal.restore()
node.stop()
node.dispose()
}
}
@ -168,7 +169,7 @@ object InteractiveShell {
}
}
val attributes = mapOf(
"node" to node,
"node" to node.internals,
"services" to node.services,
"ops" to node.rpcOps,
"mapper" to yamlInputMapper

View File

@ -23,6 +23,7 @@ import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.internal.CordaRPCOpsImpl
import net.corda.node.internal.StartedNode
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcContext
import net.corda.node.services.network.NetworkMapService
@ -55,8 +56,8 @@ class CordaRPCOpsImplTest {
}
lateinit var mockNet: MockNetwork
lateinit var aliceNode: MockNode
lateinit var notaryNode: MockNode
lateinit var aliceNode: StartedNode<MockNode>
lateinit var notaryNode: StartedNode<MockNode>
lateinit var rpc: CordaRPCOps
lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
lateinit var transactions: Observable<SignedTransaction>
@ -75,7 +76,7 @@ class CordaRPCOpsImplTest {
))))
mockNet.runNetwork()
networkMap.ensureRegistered()
networkMap.internals.ensureRegistered()
}
@After

View File

@ -36,7 +36,7 @@ import net.corda.finance.contracts.CommercialPaper
import net.corda.finance.contracts.asset.*
import net.corda.finance.flows.TwoPartyTradeFlow.Buyer
import net.corda.finance.flows.TwoPartyTradeFlow.Seller
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.persistence.DBTransactionStorage
@ -46,6 +46,7 @@ import net.corda.testing.*
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.pumpReceive
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
@ -98,8 +99,8 @@ class TwoPartyTradeFlowTests {
val cashIssuer = bankNode.info.legalIdentity.ref(1)
val cpIssuer = bankNode.info.legalIdentity.ref(1, 2, 3)
aliceNode.disableDBCloseOnStop()
bobNode.disableDBCloseOnStop()
aliceNode.internals.disableDBCloseOnStop()
bobNode.internals.disableDBCloseOnStop()
bobNode.database.transaction {
bobNode.services.fillWithSomeTestCash(2000.DOLLARS, bankNode.services, outputNotary = notaryNode.info.notaryIdentity,
@ -120,17 +121,17 @@ class TwoPartyTradeFlowTests {
// assertEquals(bobResult.get(), aliceNode.storage.validatedTransactions[aliceResult.get().id])
assertEquals(aliceResult.getOrThrow(), bobStateMachine.getOrThrow().resultFuture.getOrThrow())
aliceNode.stop()
bobNode.stop()
aliceNode.dispose()
bobNode.dispose()
aliceNode.database.transaction {
assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty()
}
aliceNode.manuallyCloseDB()
aliceNode.internals.manuallyCloseDB()
bobNode.database.transaction {
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
}
bobNode.manuallyCloseDB()
bobNode.internals.manuallyCloseDB()
}
}
@ -145,8 +146,8 @@ class TwoPartyTradeFlowTests {
val bankNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOC.name)
val issuer = bankNode.info.legalIdentity.ref(1)
aliceNode.disableDBCloseOnStop()
bobNode.disableDBCloseOnStop()
aliceNode.internals.disableDBCloseOnStop()
bobNode.internals.disableDBCloseOnStop()
val cashStates = bobNode.database.transaction {
bobNode.services.fillWithSomeTestCash(2000.DOLLARS, bankNode.services, notaryNode.info.notaryIdentity, 3, 3,
@ -174,17 +175,17 @@ class TwoPartyTradeFlowTests {
assertEquals(aliceResult.getOrThrow(), bobStateMachine.getOrThrow().resultFuture.getOrThrow())
aliceNode.stop()
bobNode.stop()
aliceNode.dispose()
bobNode.dispose()
aliceNode.database.transaction {
assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty()
}
aliceNode.manuallyCloseDB()
aliceNode.internals.manuallyCloseDB()
bobNode.database.transaction {
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
}
bobNode.manuallyCloseDB()
bobNode.internals.manuallyCloseDB()
}
}
@ -212,8 +213,8 @@ class TwoPartyTradeFlowTests {
bobNode.database.transaction {
bobNode.services.identityService.verifyAndRegisterIdentity(aliceNode.info.legalIdentityAndCert)
}
aliceNode.disableDBCloseOnStop()
bobNode.disableDBCloseOnStop()
aliceNode.internals.disableDBCloseOnStop()
bobNode.internals.disableDBCloseOnStop()
val bobAddr = bobNode.network.myAddress as InMemoryMessagingNetwork.PeerHandle
val networkMapAddress = notaryNode.network.myAddress
@ -255,7 +256,7 @@ class TwoPartyTradeFlowTests {
assertThat(bobTransactionsBeforeCrash).isNotEmpty
// .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk.
bobNode.stop()
bobNode.dispose()
// Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use.
// She will wait around until Bob comes back.
@ -272,7 +273,7 @@ class TwoPartyTradeFlowTests {
entropyRoot: BigInteger): MockNetwork.MockNode {
return MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, bobAddr.id, overrideServices, entropyRoot)
}
}, true, BOB.name)
}, BOB.name)
// Find the future representing the result of this state machine again.
val bobFuture = bobNode.smm.findStateMachines(BuyerAcceptor::class.java).single().second
@ -298,8 +299,8 @@ class TwoPartyTradeFlowTests {
assertThat(restoredBobTransactions).containsAll(bobTransactionsBeforeCrash)
}
aliceNode.manuallyCloseDB()
bobNode.manuallyCloseDB()
aliceNode.internals.manuallyCloseDB()
bobNode.internals.manuallyCloseDB()
}
}
@ -307,7 +308,7 @@ class TwoPartyTradeFlowTests {
// of gets and puts.
private fun makeNodeWithTracking(
networkMapAddress: SingleMessageRecipient?,
name: CordaX500Name): MockNetwork.MockNode {
name: CordaX500Name): StartedNode<MockNetwork.MockNode> {
// Create a node in the mock network ...
return mockNet.createNode(networkMapAddress, nodeFactory = object : MockNetwork.Factory<MockNetwork.MockNode> {
override fun create(config: NodeConfiguration,
@ -337,7 +338,7 @@ class TwoPartyTradeFlowTests {
val issuer = bankNode.info.legalIdentity.ref(1, 2, 3)
mockNet.runNetwork()
notaryNode.ensureRegistered()
notaryNode.internals.ensureRegistered()
val allNodes = listOf(notaryNode, aliceNode, bobNode, bankNode)
allNodes.forEach { node ->
@ -448,7 +449,7 @@ class TwoPartyTradeFlowTests {
val issuer = bankNode.info.legalIdentity.ref(1, 2, 3)
mockNet.runNetwork()
notaryNode.ensureRegistered()
notaryNode.internals.ensureRegistered()
val allNodes = listOf(notaryNode, aliceNode, bobNode, bankNode)
allNodes.forEach { node ->
@ -548,12 +549,12 @@ class TwoPartyTradeFlowTests {
val sellerId: StateMachineRunId
)
private fun runBuyerAndSeller(notaryNode: MockNetwork.MockNode,
sellerNode: MockNetwork.MockNode,
buyerNode: MockNetwork.MockNode,
private fun runBuyerAndSeller(notaryNode: StartedNode<MockNetwork.MockNode>,
sellerNode: StartedNode<MockNetwork.MockNode>,
buyerNode: StartedNode<MockNetwork.MockNode>,
assetToSell: StateAndRef<OwnableState>,
anonymous: Boolean = true): RunResult {
val buyerFlows: Observable<out FlowLogic<*>> = buyerNode.registerInitiatedFlow(BuyerAcceptor::class.java)
val buyerFlows: Observable<out FlowLogic<*>> = buyerNode.internals.registerInitiatedFlow(BuyerAcceptor::class.java)
val firstBuyerFiber = buyerFlows.toFuture().map { it.stateMachine }
val seller = SellerInitiator(buyerNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, anonymous)
val sellerResult = sellerNode.services.startFlow(seller).resultFuture
@ -610,7 +611,7 @@ class TwoPartyTradeFlowTests {
val issuer = bankNode.info.legalIdentity.ref(1, 2, 3)
mockNet.runNetwork()
notaryNode.ensureRegistered()
notaryNode.internals.ensureRegistered()
// Let the nodes know about each other - normally the network map would handle this
val allNodes = listOf(notaryNode, aliceNode, bobNode, bankNode)
@ -653,9 +654,9 @@ class TwoPartyTradeFlowTests {
private fun insertFakeTransactions(
wtxToSign: List<WireTransaction>,
node: AbstractNode,
notaryNode: AbstractNode,
vararg extraSigningNodes: AbstractNode): Map<SecureHash, SignedTransaction> {
node: StartedNode<*>,
notaryNode: StartedNode<*>,
vararg extraSigningNodes: StartedNode<*>): Map<SecureHash, SignedTransaction> {
val signed = wtxToSign.map {
val id = it.id

View File

@ -11,7 +11,7 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.DUMMY_NOTARY
@ -31,10 +31,10 @@ import kotlin.test.assertTrue
class NotaryChangeTests {
lateinit var mockNet: MockNetwork
lateinit var oldNotaryNode: MockNetwork.MockNode
lateinit var newNotaryNode: MockNetwork.MockNode
lateinit var clientNodeA: MockNetwork.MockNode
lateinit var clientNodeB: MockNetwork.MockNode
lateinit var oldNotaryNode: StartedNode<MockNetwork.MockNode>
lateinit var newNotaryNode: StartedNode<MockNetwork.MockNode>
lateinit var clientNodeA: StartedNode<MockNetwork.MockNode>
lateinit var clientNodeB: StartedNode<MockNetwork.MockNode>
@Before
fun setUp() {
@ -47,7 +47,7 @@ class NotaryChangeTests {
newNotaryNode = mockNet.createNode(networkMapAddress = oldNotaryNode.network.myAddress, advertisedServices = ServiceInfo(SimpleNotaryService.type))
mockNet.runNetwork() // Clear network map registration messages
oldNotaryNode.ensureRegistered()
oldNotaryNode.internals.ensureRegistered()
}
@After
@ -132,7 +132,7 @@ class NotaryChangeTests {
}
}
private fun issueEncumberedState(node: AbstractNode, notaryNode: AbstractNode): WireTransaction {
private fun issueEncumberedState(node: StartedNode<*>, notaryNode: StartedNode<*>): WireTransaction {
val owner = node.info.legalIdentity.ref(0)
val notary = notaryNode.info.notaryIdentity
@ -160,7 +160,7 @@ class NotaryChangeTests {
// - The transaction type is not a notary change transaction at all.
}
fun issueState(node: AbstractNode, notaryNode: AbstractNode): StateAndRef<*> {
fun issueState(node: StartedNode<*>, notaryNode: StartedNode<*>): StateAndRef<*> {
val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0))
val signedByNode = node.services.signInitialTransaction(tx)
val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey)
@ -168,7 +168,7 @@ fun issueState(node: AbstractNode, notaryNode: AbstractNode): StateAndRef<*> {
return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
}
fun issueMultiPartyState(nodeA: AbstractNode, nodeB: AbstractNode, notaryNode: AbstractNode): StateAndRef<DummyContract.MultiOwnerState> {
fun issueMultiPartyState(nodeA: StartedNode<*>, nodeB: StartedNode<*>, notaryNode: StartedNode<*>): StateAndRef<DummyContract.MultiOwnerState> {
val state = TransactionState(DummyContract.MultiOwnerState(0,
listOf(nodeA.info.legalIdentity, nodeB.info.legalIdentity)), DUMMY_PROGRAM_ID, notaryNode.info.notaryIdentity)
val tx = TransactionBuilder(notary = notaryNode.info.notaryIdentity).withItems(state, dummyCommand())
@ -181,7 +181,7 @@ fun issueMultiPartyState(nodeA: AbstractNode, nodeB: AbstractNode, notaryNode: A
return stateAndRef
}
fun issueInvalidState(node: AbstractNode, notary: Party): StateAndRef<*> {
fun issueInvalidState(node: StartedNode<*>, notary: Party): StateAndRef<*> {
val tx = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0))
tx.setTimeWindow(Instant.now(), 30.seconds)
val stx = node.services.signInitialTransaction(tx)

View File

@ -16,12 +16,12 @@ import net.corda.core.node.services.vault.Sort
import net.corda.core.node.services.vault.SortAttribute
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.StartedNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.contracts.DUMMY_PROGRAM_ID
import net.corda.testing.contracts.DummyContract
import net.corda.testing.dummyCommand
import net.corda.testing.node.MockNetwork
import org.junit.After
@ -37,9 +37,9 @@ class ScheduledFlowTests {
val SORTING = Sort(listOf(Sort.SortColumn(SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID), Sort.Direction.DESC)))
}
lateinit var mockNet: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var nodeA: MockNetwork.MockNode
lateinit var nodeB: MockNetwork.MockNode
lateinit var notaryNode: StartedNode<MockNetwork.MockNode>
lateinit var nodeA: StartedNode<MockNetwork.MockNode>
lateinit var nodeB: StartedNode<MockNetwork.MockNode>
data class ScheduledState(val creationTime: Instant,
val source: Party,
@ -101,12 +101,14 @@ class ScheduledFlowTests {
notaryNode = mockNet.createNode(
legalName = DUMMY_NOTARY.name,
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type)))
nodeA = mockNet.createNode(notaryNode.network.myAddress, start = false)
nodeB = mockNet.createNode(notaryNode.network.myAddress, start = false)
val a = mockNet.createUnstartedNode(notaryNode.network.myAddress)
val b = mockNet.createUnstartedNode(notaryNode.network.myAddress)
notaryNode.ensureRegistered()
notaryNode.internals.ensureRegistered()
mockNet.startNodes()
nodeA = a.started!!
nodeB = b.started!!
}
@After

View File

@ -7,6 +7,8 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.StartedNode
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.send
@ -42,8 +44,8 @@ import java.util.concurrent.LinkedBlockingQueue
abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService> {
lateinit var mockNet: MockNetwork
lateinit var mapServiceNode: MockNode
lateinit var alice: MockNode
lateinit var mapServiceNode: StartedNode<MockNode>
lateinit var alice: StartedNode<MockNode>
companion object {
val subscriberLegalName = CordaX500Name(organisation ="Subscriber", locality ="New York", country ="US")
@ -188,7 +190,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
assertThat(updates.last().wireReg.verified().serial).isEqualTo(serial)
}
private fun MockNode.fetchMap(subscribe: Boolean = false, ifChangedSinceVersion: Int? = null): List<Changed> {
private fun StartedNode<*>.fetchMap(subscribe: Boolean = false, ifChangedSinceVersion: Int? = null): List<Changed> {
val request = FetchMapRequest(subscribe, ifChangedSinceVersion, network.myAddress)
val response = services.networkService.sendRequest<FetchMapResponse>(FETCH_TOPIC, request, mapServiceNode.network.myAddress)
mockNet.runNetwork()
@ -200,7 +202,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
REMOVE -> Removed(node)
}
private fun MockNode.identityQuery(): NodeInfo? {
private fun StartedNode<*>.identityQuery(): NodeInfo? {
val request = QueryIdentityRequest(info.legalIdentityAndCert, network.myAddress)
val response = services.networkService.sendRequest<QueryIdentityResponse>(QUERY_TOPIC, request, mapServiceNode.network.myAddress)
mockNet.runNetwork()
@ -209,7 +211,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
private var lastSerial = Long.MIN_VALUE
private fun MockNode.registration(addOrRemove: AddOrRemove,
private fun StartedNode<*>.registration(addOrRemove: AddOrRemove,
serial: Long? = null): CordaFuture<RegistrationResponse> {
val distinctSerial = if (serial == null) {
++lastSerial
@ -225,7 +227,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
return response
}
private fun MockNode.subscribe(): Queue<Update> {
private fun StartedNode<*>.subscribe(): Queue<Update> {
val request = SubscribeRequest(true, network.myAddress)
val updates = LinkedBlockingQueue<Update>()
services.networkService.addMessageHandler(PUSH_TOPIC) { message, _ ->
@ -237,37 +239,37 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
return updates
}
private fun MockNode.unsubscribe() {
private fun StartedNode<*>.unsubscribe() {
val request = SubscribeRequest(false, network.myAddress)
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.network.myAddress)
mockNet.runNetwork()
assertThat(response.getOrThrow().confirmed).isTrue()
}
private fun MockNode.ackUpdate(mapVersion: Int) {
private fun StartedNode<*>.ackUpdate(mapVersion: Int) {
val request = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
services.networkService.send(PUSH_ACK_TOPIC, MessagingService.DEFAULT_SESSION_ID, request, mapServiceNode.network.myAddress)
mockNet.runNetwork()
}
private fun addNewNodeToNetworkMap(legalName: CordaX500Name): MockNode {
private fun addNewNodeToNetworkMap(legalName: CordaX500Name): StartedNode<MockNode> {
val node = mockNet.createNode(mapServiceNode.network.myAddress, legalName = legalName)
mockNet.runNetwork()
lastSerial = System.currentTimeMillis()
return node
}
private fun newNodeSeparateFromNetworkMap(legalName: CordaX500Name): MockNode {
private fun newNodeSeparateFromNetworkMap(legalName: CordaX500Name): StartedNode<MockNode> {
return mockNet.createNode(legalName = legalName, nodeFactory = NoNMSNodeFactory)
}
sealed class Changed {
data class Added(val node: NodeInfo) : Changed() {
constructor(node: MockNode) : this(node.info)
constructor(node: StartedNode<*>) : this(node.info)
}
data class Removed(val node: NodeInfo) : Changed() {
constructor(node: MockNode) : this(node.info)
constructor(node: StartedNode<*>) : this(node.info)
}
}
@ -280,7 +282,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNode {
return object : MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
override fun makeNetworkMapService() {}
override fun makeNetworkMapService() = NullNetworkMapService
}
}
}

View File

@ -9,6 +9,7 @@ import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.CHARLIE
@ -28,11 +29,11 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
@Before
fun start() {
val nodes = startNodesWithPort(partiesList)
nodes.forEach { it.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting.
nodes.forEach { it.internals.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting.
nodes.forEach {
infos.add(it.info)
addressesMap[it.info.legalIdentity.name] = it.info.addresses[0]
it.stop() // We want them to communicate with NetworkMapService to save data to cache.
it.dispose() // We want them to communicate with NetworkMapService to save data to cache.
}
}
@ -63,7 +64,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
val partyNodes = alice.services.networkMapCache.partyNodes
assert(NetworkMapService.type !in alice.info.advertisedServices.map { it.info.type })
assertEquals(null, alice.inNodeNetworkMapService)
assertEquals(NullNetworkMapService, alice.inNodeNetworkMapService)
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet())
}
@ -72,7 +73,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() {
val parties = partiesList.subList(1, partiesList.size)
val nodes = startNodesWithPort(parties, noNetworkMap = true)
assert(nodes.all { it.inNodeNetworkMapService == null })
assert(nodes.all { it.inNodeNetworkMapService == NullNetworkMapService })
assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } })
nodes.forEach {
val partyNodes = it.services.networkMapCache.partyNodes
@ -86,7 +87,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() {
val parties = partiesList.subList(1, partiesList.size)
val nodes = startNodesWithPort(parties, noNetworkMap = false)
assert(nodes.all { it.inNodeNetworkMapService == null })
assert(nodes.all { it.inNodeNetworkMapService == NullNetworkMapService })
assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } })
nodes.forEach {
val partyNodes = it.services.networkMapCache.partyNodes
@ -123,13 +124,13 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
}
// Start Network Map and see that charlie node appears in caches.
val nms = startNodesWithPort(listOf(DUMMY_NOTARY), noNetworkMap = false)[0]
nms.startupComplete.get()
assert(nms.inNodeNetworkMapService != null)
nms.internals.startupComplete.get()
assert(nms.inNodeNetworkMapService != NullNetworkMapService)
assert(infos.any {it.legalIdentity == nms.info.legalIdentity})
otherNodes.forEach {
assert(nms.info.legalIdentity in it.services.networkMapCache.partyNodes.map { it.legalIdentity })
}
charlie.nodeReadyFuture.get() // Finish registration.
charlie.internals.nodeReadyFuture.get() // Finish registration.
checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS.
val cacheA = otherNodes[0].services.networkMapCache.partyNodes
val cacheB = otherNodes[1].services.networkMapCache.partyNodes
@ -142,7 +143,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
// HELPERS
// Helper function to restart nodes with the same host and port.
private fun startNodesWithPort(nodesToStart: List<Party>, noNetworkMap: Boolean = false): List<Node> {
private fun startNodesWithPort(nodesToStart: List<Party>, noNetworkMap: Boolean = false): List<StartedNode<Node>> {
return nodesToStart.map { party ->
val configOverrides = addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()
if (party == DUMMY_NOTARY) {
@ -158,10 +159,10 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
}
// Check that nodes are functional, communicate each with each.
private fun checkConnectivity(nodes: List<Node>) {
private fun checkConnectivity(nodes: List<StartedNode<*>>) {
nodes.forEach { node1 ->
nodes.forEach { node2 ->
node2.registerInitiatedFlow(SendBackFlow::class.java)
node2.internals.registerInitiatedFlow(SendBackFlow::class.java)
val resultFuture = node1.services.startFlow(SendFlow(node2.info.legalIdentity)).resultFuture
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!")
}

View File

@ -35,9 +35,7 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest<Persistent
overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNode {
return object : MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
override fun makeNetworkMapService() {
inNodeNetworkMapService = SwizzleNetworkMapService(services)
}
override fun makeNetworkMapService() = SwizzleNetworkMapService(services)
}
}
}

View File

@ -13,11 +13,11 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.finance.USD
import net.corda.finance.contracts.asset.Cash
import net.corda.node.internal.StartedNode
import net.corda.node.services.NotifyTransactionHandler
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.MEGA_CORP
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
@ -95,8 +95,8 @@ class DataVendingServiceTests {
}
}
private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) {
walletServiceNode.registerInitiatedFlow(InitiateNotifyTxFlow::class.java)
private fun StartedNode<*>.sendNotifyTx(tx: SignedTransaction, walletServiceNode: StartedNode<*>) {
walletServiceNode.internals.registerInitiatedFlow(InitiateNotifyTxFlow::class.java)
services.startFlow(NotifyTxFlow(walletServiceNode.info.legalIdentity, tx))
mockNet.runNetwork()
}

View File

@ -27,7 +27,7 @@ class NodeSchemaServiceTest {
val mockNode = mockNet.createNode()
mockNet.runNetwork()
mockNode.registerCustomSchemas(setOf(DummyLinearStateSchemaV1))
mockNode.internals.registerCustomSchemas(setOf(DummyLinearStateSchemaV1))
val schemaService = mockNode.services.schemaService
assertTrue(schemaService.schemaOptions.containsKey(DummyLinearStateSchemaV1))

View File

@ -31,6 +31,7 @@ import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.StartedNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.transactions.ValidatingNotaryService
@ -42,6 +43,7 @@ import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.node.pumpReceive
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
@ -66,10 +68,10 @@ class FlowFrameworkTests {
private val mockNet = MockNetwork(servicePeerAllocationStrategy = RoundRobin())
private val receivedSessionMessages = ArrayList<SessionTransfer>()
private lateinit var node1: MockNode
private lateinit var node2: MockNode
private lateinit var notary1: MockNode
private lateinit var notary2: MockNode
private lateinit var node1: StartedNode<MockNode>
private lateinit var node2: StartedNode<MockNode>
private lateinit var notary1: StartedNode<MockNode>
private lateinit var notary2: StartedNode<MockNode>
@Before
fun start() {
@ -77,7 +79,7 @@ class FlowFrameworkTests {
node2 = mockNet.createNode(networkMapAddress = node1.network.myAddress)
mockNet.runNetwork()
node1.ensureRegistered()
node1.internals.ensureRegistered()
// We intentionally create our own notary and ignore the one provided by the network
val notaryKeyPair = generateKeyPair()
@ -111,7 +113,7 @@ class FlowFrameworkTests {
@Test
fun `newly added flow is preserved on restart`() {
node1.services.startFlow(NoOpFlow(nonTerminating = true))
node1.acceptableLiveFiberCountOnStop = 1
node1.internals.acceptableLiveFiberCountOnStop = 1
val restoredFlow = node1.restartAndGetRestoredFlow<NoOpFlow>()
assertThat(restoredFlow.flowStarted).isTrue()
}
@ -149,9 +151,9 @@ class FlowFrameworkTests {
// We push through just enough messages to get only the payload sent
node2.pumpReceive()
node2.disableDBCloseOnStop()
node2.acceptableLiveFiberCountOnStop = 1
node2.stop()
node2.internals.disableDBCloseOnStop()
node2.internals.acceptableLiveFiberCountOnStop = 1
node2.dispose()
mockNet.runNetwork()
val restoredFlow = node2.restartAndGetRestoredFlow<ReceiveFlow>(node1)
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
@ -173,22 +175,22 @@ class FlowFrameworkTests {
val flow = NoOpFlow()
node3.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
node3.disableDBCloseOnStop()
node3.internals.disableDBCloseOnStop()
node3.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network.
node3.stop()
node3.dispose()
node3 = mockNet.createNode(node1.network.myAddress, node3.id)
node3 = mockNet.createNode(node1.network.myAddress, node3.internals.id)
val restoredFlow = node3.getSingleFlow<NoOpFlow>().first
assertEquals(false, restoredFlow.flowStarted) // Not started yet as no network activity has been allowed yet
mockNet.runNetwork() // Allow network map messages to flow
node3.smm.executor.flush()
assertEquals(true, restoredFlow.flowStarted) // Now we should have run the flow and hopefully cleared the init checkpoint
node3.disableDBCloseOnStop()
node3.internals.disableDBCloseOnStop()
node3.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network.
node3.stop()
node3.dispose()
// Now it is completed the flow should leave no Checkpoint.
node3 = mockNet.createNode(node1.network.myAddress, node3.id)
node3 = mockNet.createNode(node1.network.myAddress, node3.internals.id)
mockNet.runNetwork() // Allow network map messages to flow
node3.smm.executor.flush()
assertTrue(node3.smm.findStateMachines(NoOpFlow::class.java).isEmpty())
@ -200,8 +202,8 @@ class FlowFrameworkTests {
node2.services.startFlow(ReceiveFlow(node1.info.legalIdentity).nonTerminating()) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.disableDBCloseOnStop()
node2.stop() // kill receiver
node2.internals.disableDBCloseOnStop()
node2.dispose() // kill receiver
val restoredFlow = node2.restartAndGetRestoredFlow<ReceiveFlow>(node1)
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
}
@ -225,15 +227,15 @@ class FlowFrameworkTests {
}
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.disableDBCloseOnStop()
node2.internals.disableDBCloseOnStop()
// Restart node and thus reload the checkpoint and resend the message with same UUID
node2.stop()
node2.dispose()
node2.database.transaction {
assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint
node2.services.networkMapCache.clearNetworkMapCache()
}
val node2b = mockNet.createNode(node1.network.myAddress, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
node2.manuallyCloseDB()
val node2b = mockNet.createNode(node1.network.myAddress, node2.internals.id, advertisedServices = *node2.internals.advertisedServices.toTypedArray())
node2.internals.manuallyCloseDB()
val (firstAgain, fut1) = node2b.getSingleFlow<PingPongFlow>()
// Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync.
mockNet.runNetwork()
@ -285,8 +287,8 @@ class FlowFrameworkTests {
//There's no session end from the other flows as they're manually suspended
)
node2.acceptableLiveFiberCountOnStop = 1
node3.acceptableLiveFiberCountOnStop = 1
node2.internals.acceptableLiveFiberCountOnStop = 1
node3.internals.acceptableLiveFiberCountOnStop = 1
}
@Test
@ -299,7 +301,7 @@ class FlowFrameworkTests {
node3.registerFlowFactory(ReceiveFlow::class) { SendFlow(node3Payload, it) }
val multiReceiveFlow = ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity).nonTerminating()
node1.services.startFlow(multiReceiveFlow)
node1.acceptableLiveFiberCountOnStop = 1
node1.internals.acceptableLiveFiberCountOnStop = 1
mockNet.runNetwork()
assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(node2Payload)
assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(node3Payload)
@ -360,32 +362,32 @@ class FlowFrameworkTests {
// First Pay
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) {
it.message as SessionInit
assertEquals(node1.id, it.from)
assertEquals(node1.internals.id, it.from)
assertEquals(notary1Address, it.to)
},
expect(match = { it.message is SessionConfirm }) {
it.message as SessionConfirm
assertEquals(notary1.id, it.from)
assertEquals(notary1.internals.id, it.from)
},
// Second pay
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) {
it.message as SessionInit
assertEquals(node1.id, it.from)
assertEquals(node1.internals.id, it.from)
assertEquals(notary1Address, it.to)
},
expect(match = { it.message is SessionConfirm }) {
it.message as SessionConfirm
assertEquals(notary2.id, it.from)
assertEquals(notary2.internals.id, it.from)
},
// Third pay
expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) {
it.message as SessionInit
assertEquals(node1.id, it.from)
assertEquals(node1.internals.id, it.from)
assertEquals(notary1Address, it.to)
},
expect(match = { it.message is SessionConfirm }) {
it.message as SessionConfirm
assertEquals(it.from, notary1.id)
assertEquals(it.from, notary1.internals.id)
}
)
}
@ -740,26 +742,26 @@ class FlowFrameworkTests {
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//region Helpers
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(networkMapNode: MockNode? = null): P {
private inline fun <reified P : FlowLogic<*>> StartedNode<MockNode>.restartAndGetRestoredFlow(networkMapNode: StartedNode<*>? = null) = internals.run {
disableDBCloseOnStop() // Handover DB to new node copy
stop()
val newNode = mockNet.createNode(networkMapNode?.network?.myAddress, id, advertisedServices = *advertisedServices.toTypedArray())
newNode.acceptableLiveFiberCountOnStop = 1
newNode.internals.acceptableLiveFiberCountOnStop = 1
manuallyCloseDB()
mockNet.runNetwork() // allow NetworkMapService messages to stabilise and thus start the state machine
return newNode.getSingleFlow<P>().first
newNode.getSingleFlow<P>().first
}
private inline fun <reified P : FlowLogic<*>> MockNode.getSingleFlow(): Pair<P, CordaFuture<*>> {
private inline fun <reified P : FlowLogic<*>> StartedNode<*>.getSingleFlow(): Pair<P, CordaFuture<*>> {
return smm.findStateMachines(P::class.java).single()
}
private inline fun <reified P : FlowLogic<*>> MockNode.registerFlowFactory(
private inline fun <reified P : FlowLogic<*>> StartedNode<*>.registerFlowFactory(
initiatingFlowClass: KClass<out FlowLogic<*>>,
initiatedFlowVersion: Int = 1,
noinline flowFactory: (Party) -> P): CordaFuture<P>
{
val observable = internalRegisterFlowFactory(
val observable = internals.internalRegisterFlowFactory(
initiatingFlowClass.java,
InitiatedFlowFactory.CorDapp(initiatedFlowVersion, "", flowFactory),
P::class.java,
@ -775,7 +777,7 @@ class FlowFrameworkTests {
private val normalEnd = NormalSessionEnd(0)
private fun erroredEnd(errorResponse: FlowException? = null) = ErrorSessionEnd(0, errorResponse)
private fun MockNode.sendSessionMessage(message: SessionMessage, destination: MockNode) {
private fun StartedNode<*>.sendSessionMessage(message: SessionMessage, destination: StartedNode<*>) {
services.networkService.apply {
val address = getAddressOfParty(PartyInfo.Node(destination.info))
send(createMessage(StateMachineManager.sessionTopic, message.serialize().bytes), address)
@ -786,8 +788,8 @@ class FlowFrameworkTests {
assertThat(receivedSessionMessages).containsExactly(*expected)
}
private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer): List<SessionTransfer> {
val actualForNode = receivedSessionMessages.filter { it.from == node.id || it.to == node.network.myAddress }
private fun assertSessionTransfers(node: StartedNode<MockNode>, vararg expected: SessionTransfer): List<SessionTransfer> {
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
assertThat(actualForNode).containsExactly(*expected)
return actualForNode
}
@ -818,8 +820,8 @@ class FlowFrameworkTests {
else -> message
}
private infix fun MockNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(id, message)
private infix fun Pair<Int, SessionMessage>.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress)
private infix fun StartedNode<MockNode>.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(internals.id, message)
private infix fun Pair<Int, SessionMessage>.to(node: StartedNode<*>): SessionTransfer = SessionTransfer(first, second, node.network.myAddress)
private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTracker.Step>>> get() {
return progressTracker!!.changes

View File

@ -12,7 +12,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.contracts.DummyContract
@ -29,8 +29,8 @@ import kotlin.test.assertFailsWith
class NotaryServiceTests {
lateinit var mockNet: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var clientNode: MockNetwork.MockNode
lateinit var notaryNode: StartedNode<MockNetwork.MockNode>
lateinit var clientNode: StartedNode<MockNetwork.MockNode>
@Before
fun setup() {
@ -40,7 +40,7 @@ class NotaryServiceTests {
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type)))
clientNode = mockNet.createNode(notaryNode.network.myAddress)
mockNet.runNetwork() // Clear network map registration messages
notaryNode.ensureRegistered()
notaryNode.internals.ensureRegistered()
}
@After
@ -153,7 +153,7 @@ class NotaryServiceTests {
return future
}
fun issueState(node: AbstractNode): StateAndRef<*> {
fun issueState(node: StartedNode<*>): StateAndRef<*> {
val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0))
val signedByNode = node.services.signInitialTransaction(tx)
val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey)

View File

@ -12,7 +12,7 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.transactions.TransactionBuilder
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.services.issueInvalidState
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.DUMMY_NOTARY
@ -30,8 +30,8 @@ import kotlin.test.assertFailsWith
class ValidatingNotaryServiceTests {
lateinit var mockNet: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var clientNode: MockNetwork.MockNode
lateinit var notaryNode: StartedNode<MockNetwork.MockNode>
lateinit var clientNode: StartedNode<MockNetwork.MockNode>
@Before
fun setup() {
@ -42,7 +42,7 @@ class ValidatingNotaryServiceTests {
)
clientNode = mockNet.createNode(notaryNode.network.myAddress)
mockNet.runNetwork() // Clear network map registration messages
notaryNode.ensureRegistered()
notaryNode.internals.ensureRegistered()
}
@After
@ -96,7 +96,7 @@ class ValidatingNotaryServiceTests {
return future
}
fun issueState(node: AbstractNode): StateAndRef<*> {
fun issueState(node: StartedNode<*>): StateAndRef<*> {
val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0))
val signedByNode = node.services.signInitialTransaction(tx)
val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey)