From cfd6739d236a4cdd5b135fea9ab87d4210a6db8b Mon Sep 17 00:00:00 2001 From: Andrzej Cichocki Date: Wed, 13 Sep 2017 17:34:52 +0100 Subject: [PATCH] Introduce StartedNode (#1491) --- .../client/rpc/CordaRPCJavaClientTest.java | 11 +-- .../corda/client/rpc/CordaRPCClientTest.kt | 7 +- .../net/corda/core/internal/InternalUtils.kt | 3 + .../net/corda/core/flows/FlowsInJavaTest.java | 9 +- .../net/corda/core/flows/AttachmentTests.kt | 28 +++---- .../core/flows/CollectSignaturesFlowTests.kt | 11 +-- .../core/flows/ContractUpgradeFlowTest.kt | 13 ++- .../net/corda/core/flows/FinalityFlowTests.kt | 7 +- .../corda/core/flows/IdentitySyncFlowTests.kt | 2 +- .../core/flows/ManualFinalityFlowTests.kt | 9 +- .../internal/ResolveTransactionsFlowTest.kt | 9 +- .../AttachmentSerializationTest.kt | 33 ++++---- .../net/corda/docs/CustomVaultQueryTest.kt | 15 ++-- .../docs/FxTransactionBuildTutorialTest.kt | 13 +-- .../WorkflowTransactionBuildTutorialTest.kt | 9 +- .../corda/finance/flows/CashExitFlowTests.kt | 5 +- .../corda/finance/flows/CashIssueFlowTests.kt | 5 +- .../finance/flows/CashPaymentFlowTests.kt | 5 +- .../node/services/BFTNotaryServiceTests.kt | 4 +- .../node/services/RaftNotaryServiceTests.kt | 4 +- .../statemachine/FlowVersioningTest.kt | 2 +- .../messaging/MQSecurityAsNodeTest.kt | 14 ++-- .../services/messaging/MQSecurityAsRPCTest.kt | 4 +- .../services/messaging/MQSecurityTest.kt | 9 +- .../services/messaging/P2PMessagingTest.kt | 14 ++-- .../services/messaging/P2PSecurityTest.kt | 4 +- .../net/corda/node/internal/AbstractNode.kt | 80 ++++++++++-------- .../kotlin/net/corda/node/internal/Node.kt | 13 +-- .../net/corda/node/internal/NodeStartup.kt | 12 ++- .../net/corda/node/internal/StartedNode.kt | 25 ++++++ .../services/network/NetworkMapService.kt | 2 + .../net/corda/node/shell/InteractiveShell.kt | 9 +- .../net/corda/node/CordaRPCOpsImplTest.kt | 7 +- .../node/messaging/TwoPartyTradeFlowTests.kt | 61 +++++++------- .../corda/node/services/NotaryChangeTests.kt | 20 ++--- .../services/events/ScheduledFlowTests.kt | 16 ++-- .../network/AbstractNetworkMapServiceTest.kt | 28 ++++--- .../network/PersistentNetworkMapCacheTest.kt | 23 ++--- .../PersistentNetworkMapServiceTest.kt | 4 +- .../persistence/DataVendingServiceTests.kt | 6 +- .../services/schema/NodeSchemaServiceTest.kt | 2 +- .../statemachine/FlowFrameworkTests.kt | 84 ++++++++++--------- .../transactions/NotaryServiceTests.kt | 10 +-- .../ValidatingNotaryServiceTests.kt | 10 +-- .../corda/irs/api/NodeInterestRatesTest.kt | 6 +- .../net/corda/netmap/NetworkMapVisualiser.kt | 4 +- .../net/corda/netmap/VisualiserViewModel.kt | 2 +- .../corda/netmap/simulation/IRSSimulation.kt | 28 +++---- .../net/corda/netmap/simulation/Simulation.kt | 20 +++-- .../netmap/simulation/IRSSimulationTest.kt | 2 +- .../net/corda/traderdemo/TraderDemoTest.kt | 10 +-- .../kotlin/net/corda/testing/driver/Driver.kt | 14 ++-- .../kotlin/net/corda/testing/node/MockNode.kt | 73 +++++++++++----- .../net/corda/testing/node/NodeBasedTest.kt | 31 ++++--- 54 files changed, 468 insertions(+), 383 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/internal/StartedNode.kt diff --git a/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java b/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java index e483d8ad6a..149bbf8a81 100644 --- a/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java +++ b/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java @@ -12,6 +12,7 @@ import net.corda.finance.flows.CashIssueFlow; import net.corda.finance.flows.CashPaymentFlow; import net.corda.finance.schemas.*; import net.corda.node.internal.Node; +import net.corda.node.internal.StartedNode; import net.corda.node.services.transactions.ValidatingNotaryService; import net.corda.nodeapi.User; import net.corda.testing.node.NodeBasedTest; @@ -38,7 +39,7 @@ public class CordaRPCJavaClientTest extends NodeBasedTest { private Set permSet = new HashSet<>(perms); private User rpcUser = new User("user1", "test", permSet); - private Node node; + private StartedNode node; private CordaRPCClient client; private RPCClient.RPCConnection connection = null; private CordaRPCOps rpcProxy; @@ -51,10 +52,10 @@ public class CordaRPCJavaClientTest extends NodeBasedTest { @Before public void setUp() throws ExecutionException, InterruptedException { Set services = new HashSet<>(singletonList(new ServiceInfo(ValidatingNotaryService.Companion.getType(), null))); - CordaFuture nodeFuture = startNode(getALICE().getName(), 1, services, singletonList(rpcUser), emptyMap()); + CordaFuture> nodeFuture = startNode(getALICE().getName(), 1, services, singletonList(rpcUser), emptyMap()); node = nodeFuture.get(); - node.registerCustomSchemas(Collections.singleton(CashSchemaV1.INSTANCE)); - client = new CordaRPCClient(requireNonNull(node.getConfiguration().getRpcAddress()), null, getDefault(), false); + node.getInternals().registerCustomSchemas(Collections.singleton(CashSchemaV1.INSTANCE)); + client = new CordaRPCClient(requireNonNull(node.getInternals().getConfiguration().getRpcAddress()), null, getDefault(), false); } @After @@ -73,7 +74,7 @@ public class CordaRPCJavaClientTest extends NodeBasedTest { FlowHandle flowHandle = rpcProxy.startFlowDynamic(CashIssueFlow.class, DOLLARS(123), OpaqueBytes.of("1".getBytes()), - node.info.getLegalIdentity()); + node.getInfo().getLegalIdentity()); System.out.println("Started issuing cash, waiting on result"); flowHandle.getReturnValue().get(); diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index 86391ea84a..504e7f1601 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -15,6 +15,7 @@ import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow import net.corda.finance.schemas.CashSchemaV1 import net.corda.node.internal.Node +import net.corda.node.internal.StartedNode import net.corda.node.services.FlowPermissions.Companion.startFlowPermission import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.nodeapi.User @@ -34,7 +35,7 @@ class CordaRPCClientTest : NodeBasedTest() { startFlowPermission(), startFlowPermission() )) - private lateinit var node: Node + private lateinit var node: StartedNode private lateinit var client: CordaRPCClient private var connection: CordaRPCConnection? = null @@ -45,8 +46,8 @@ class CordaRPCClientTest : NodeBasedTest() { @Before fun setUp() { node = startNode(ALICE.name, rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow() - node.registerCustomSchemas(setOf(CashSchemaV1)) - client = CordaRPCClient(node.configuration.rpcAddress!!, initialiseSerialization = false) + node.internals.registerCustomSchemas(setOf(CashSchemaV1)) + client = CordaRPCClient(node.internals.configuration.rpcAddress!!, initialiseSerialization = false) } @After diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index e66ca03ba8..c1e780ba7e 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -266,3 +266,6 @@ class DeclaredField(clazz: Class<*>, name: String, private val receiver: Any? @Retention(AnnotationRetention.SOURCE) @MustBeDocumented annotation class VisibleForTesting + +@Suppress("UNCHECKED_CAST") +fun uncheckedCast(obj: T) = obj as U diff --git a/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java b/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java index af255b2938..460552127f 100644 --- a/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java +++ b/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java @@ -3,6 +3,7 @@ package net.corda.core.flows; import co.paralleluniverse.fibers.Suspendable; import com.google.common.primitives.Primitives; import net.corda.core.identity.Party; +import net.corda.node.internal.StartedNode; import net.corda.testing.node.MockNetwork; import org.junit.After; import org.junit.Before; @@ -17,8 +18,8 @@ import static org.junit.Assert.fail; public class FlowsInJavaTest { private final MockNetwork mockNet = new MockNetwork(); - private MockNetwork.MockNode node1; - private MockNetwork.MockNode node2; + private StartedNode node1; + private StartedNode node2; @Before public void setUp() throws Exception { @@ -27,7 +28,7 @@ public class FlowsInJavaTest { node2 = someNodes.getPartyNodes().get(1); mockNet.runNetwork(); // Ensure registration was successful - node1.getNodeReadyFuture().get(); + node1.getInternals().getNodeReadyFuture().get(); } @After @@ -37,7 +38,7 @@ public class FlowsInJavaTest { @Test public void suspendableActionInsideUnwrap() throws Exception { - node2.registerInitiatedFlow(SendHelloAndThenReceive.class); + node2.getInternals().registerInitiatedFlow(SendHelloAndThenReceive.class); Future result = node1.getServices().startFlow(new SendInUnwrapFlow(node2.getInfo().getLegalIdentity())).getResultFuture(); mockNet.runNetwork(); assertThat(result.get()).isEqualTo("Hello"); diff --git a/core/src/test/kotlin/net/corda/core/flows/AttachmentTests.kt b/core/src/test/kotlin/net/corda/core/flows/AttachmentTests.kt index 378ed0b0f1..88b4a90028 100644 --- a/core/src/test/kotlin/net/corda/core/flows/AttachmentTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/AttachmentTests.kt @@ -10,6 +10,7 @@ import net.corda.core.internal.FetchDataFlow import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.ServiceInfo import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.StartedNode import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.network.NetworkMapService import net.corda.node.services.persistence.NodeAttachmentService @@ -59,10 +60,10 @@ class AttachmentTests { // Ensure that registration was successful before progressing any further mockNet.runNetwork() - n0.ensureRegistered() + n0.internals.ensureRegistered() - n0.registerInitiatedFlow(FetchAttachmentsResponse::class.java) - n1.registerInitiatedFlow(FetchAttachmentsResponse::class.java) + n0.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java) + n1.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java) // Insert an attachment into node zero's store directly. val id = n0.database.transaction { @@ -83,7 +84,7 @@ class AttachmentTests { assertEquals(id, attachment.open().readBytes().sha256()) // Shut down node zero and ensure node one can still resolve the attachment. - n0.stop() + n0.dispose() val response: FetchDataFlow.Result = n1.startAttachmentFlow(setOf(id), n0.info.legalIdentity).resultFuture.getOrThrow() assertEquals(attachment, response.fromDisk[0]) @@ -97,10 +98,10 @@ class AttachmentTests { // Ensure that registration was successful before progressing any further mockNet.runNetwork() - n0.ensureRegistered() + n0.internals.ensureRegistered() - n0.registerInitiatedFlow(FetchAttachmentsResponse::class.java) - n1.registerInitiatedFlow(FetchAttachmentsResponse::class.java) + n0.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java) + n1.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java) // Get node one to fetch a non-existent attachment. val hash = SecureHash.randomSHA256() @@ -120,10 +121,7 @@ class AttachmentTests { overrideServices: Map?, entropyRoot: BigInteger): MockNetwork.MockNode { return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { - override fun start() { - super.start() - attachments.checkAttachmentsOnLoad = false - } + override fun start() = super.start().apply { attachments.checkAttachmentsOnLoad = false } } } }, advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type))) @@ -131,10 +129,10 @@ class AttachmentTests { // Ensure that registration was successful before progressing any further mockNet.runNetwork() - n0.ensureRegistered() + n0.internals.ensureRegistered() - n0.registerInitiatedFlow(FetchAttachmentsResponse::class.java) - n1.registerInitiatedFlow(FetchAttachmentsResponse::class.java) + n0.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java) + n1.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java) val attachment = fakeAttachment() // Insert an attachment into node zero's store directly. @@ -158,7 +156,7 @@ class AttachmentTests { assertFailsWith { f1.resultFuture.getOrThrow() } } - private fun MockNetwork.MockNode.startAttachmentFlow(hashes: Set, otherSide: Party) = services.startFlow(InitiatingFetchAttachmentsFlow(otherSide, hashes)) + private fun StartedNode<*>.startAttachmentFlow(hashes: Set, otherSide: Party) = services.startFlow(InitiatingFetchAttachmentsFlow(otherSide, hashes)) @InitiatingFlow private class InitiatingFetchAttachmentsFlow(val otherSide: Party, val hashes: Set) : FlowLogic>() { diff --git a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt index af17e33e6b..9dea137bda 100644 --- a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt @@ -10,6 +10,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap +import net.corda.node.internal.StartedNode import net.corda.testing.MINI_CORP_KEY import net.corda.testing.contracts.DUMMY_PROGRAM_ID import net.corda.testing.contracts.DummyContract @@ -23,9 +24,9 @@ import kotlin.test.assertFailsWith class CollectSignaturesFlowTests { lateinit var mockNet: MockNetwork - lateinit var a: MockNetwork.MockNode - lateinit var b: MockNetwork.MockNode - lateinit var c: MockNetwork.MockNode + lateinit var a: StartedNode + lateinit var b: StartedNode + lateinit var c: StartedNode lateinit var notary: Party val services = MockServices() @@ -38,7 +39,7 @@ class CollectSignaturesFlowTests { c = nodes.partyNodes[2] notary = nodes.notaryNode.info.notaryIdentity mockNet.runNetwork() - a.ensureRegistered() + a.internals.ensureRegistered() } @After @@ -48,7 +49,7 @@ class CollectSignaturesFlowTests { private fun registerFlowOnAllNodes(flowClass: KClass>) { listOf(a, b, c).forEach { - it.registerInitiatedFlow(flowClass.java) + it.internals.registerInitiatedFlow(flowClass.java) } } diff --git a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt index db69d2981b..fe1b51e7c6 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt @@ -18,6 +18,7 @@ import net.corda.finance.contracts.asset.CASH_PROGRAM_ID import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.CashIssueFlow import net.corda.node.internal.CordaRPCOpsImpl +import net.corda.node.internal.StartedNode import net.corda.node.services.FlowPermissions.Companion.startFlowPermission import net.corda.nodeapi.User import net.corda.testing.RPCDriverExposedDSLInterface @@ -37,8 +38,8 @@ import kotlin.test.assertTrue class ContractUpgradeFlowTest { lateinit var mockNet: MockNetwork - lateinit var a: MockNetwork.MockNode - lateinit var b: MockNetwork.MockNode + lateinit var a: StartedNode + lateinit var b: StartedNode lateinit var notary: Party @Before @@ -50,7 +51,7 @@ class ContractUpgradeFlowTest { // Process registration mockNet.runNetwork() - a.ensureRegistered() + a.internals.ensureRegistered() notary = nodes.notaryNode.info.notaryIdentity @@ -107,7 +108,7 @@ class ContractUpgradeFlowTest { val result = resultFuture.getOrThrow() - fun check(node: MockNetwork.MockNode) { + fun check(node: StartedNode<*>) { val nodeStx = node.database.transaction { node.services.validatedTransactions.getTransaction(result.ref.txhash) } @@ -127,7 +128,7 @@ class ContractUpgradeFlowTest { check(b) } - private fun RPCDriverExposedDSLInterface.startProxy(node: MockNetwork.MockNode, user: User): CordaRPCOps { + private fun RPCDriverExposedDSLInterface.startProxy(node: StartedNode<*>, user: User): CordaRPCOps { return startRpcClient( rpcAddress = startRpcServer( rpcUser = user, @@ -235,8 +236,6 @@ class ContractUpgradeFlowTest { assertEquals>(listOf(anonymisedRecipient), (firstState.state.data as CashV2.State).owners, "Upgraded cash belongs to the right owner.") } - val CASHV2_PROGRAM_ID = "net.corda.core.flows.ContractUpgradeFlowTest.CashV2" - class CashV2 : UpgradedContract { override val legacyContract = CASH_PROGRAM_ID diff --git a/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt index 3ab74af1c7..92b822fc47 100644 --- a/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt @@ -8,6 +8,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.GBP import net.corda.finance.contracts.asset.Cash import net.corda.testing.ALICE +import net.corda.node.internal.StartedNode import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockServices import org.junit.After @@ -18,8 +19,8 @@ import kotlin.test.assertFailsWith class FinalityFlowTests { lateinit var mockNet: MockNetwork - lateinit var nodeA: MockNetwork.MockNode - lateinit var nodeB: MockNetwork.MockNode + lateinit var nodeA: StartedNode + lateinit var nodeB: StartedNode lateinit var notary: Party val services = MockServices() @@ -31,7 +32,7 @@ class FinalityFlowTests { nodeB = nodes.partyNodes[1] notary = nodes.notaryNode.info.notaryIdentity mockNet.runNetwork() - nodeA.ensureRegistered() + nodeA.internals.ensureRegistered() } @After diff --git a/core/src/test/kotlin/net/corda/core/flows/IdentitySyncFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/IdentitySyncFlowTests.kt index 401b864fb1..e6b46bd086 100644 --- a/core/src/test/kotlin/net/corda/core/flows/IdentitySyncFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/IdentitySyncFlowTests.kt @@ -41,7 +41,7 @@ class IdentitySyncFlowTests { val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name) val alice: Party = aliceNode.services.myInfo.legalIdentity val bob: Party = bobNode.services.myInfo.legalIdentity - bobNode.registerInitiatedFlow(Receive::class.java) + bobNode.internals.registerInitiatedFlow(Receive::class.java) // Alice issues then pays some cash to a new confidential identity that Bob doesn't know about val anonymous = true diff --git a/core/src/test/kotlin/net/corda/core/flows/ManualFinalityFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/ManualFinalityFlowTests.kt index f872cecb8b..2feda78123 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ManualFinalityFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ManualFinalityFlowTests.kt @@ -7,6 +7,7 @@ import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow import net.corda.finance.GBP import net.corda.finance.contracts.asset.Cash +import net.corda.node.internal.StartedNode import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockServices import org.junit.After @@ -17,9 +18,9 @@ import kotlin.test.assertNull class ManualFinalityFlowTests { lateinit var mockNet: MockNetwork - lateinit var nodeA: MockNetwork.MockNode - lateinit var nodeB: MockNetwork.MockNode - lateinit var nodeC: MockNetwork.MockNode + lateinit var nodeA: StartedNode + lateinit var nodeB: StartedNode + lateinit var nodeC: StartedNode lateinit var notary: Party val services = MockServices() @@ -32,7 +33,7 @@ class ManualFinalityFlowTests { nodeC = nodes.partyNodes[2] notary = nodes.notaryNode.info.notaryIdentity mockNet.runNetwork() - nodeA.ensureRegistered() + nodeA.internals.ensureRegistered() } @After diff --git a/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt b/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt index 6ead35bc90..117aa89f7e 100644 --- a/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt @@ -10,6 +10,7 @@ import net.corda.core.identity.Party import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.sequence +import net.corda.node.internal.StartedNode import net.corda.testing.DUMMY_NOTARY_KEY import net.corda.testing.MEGA_CORP import net.corda.testing.MEGA_CORP_KEY @@ -31,8 +32,8 @@ import kotlin.test.assertNull class ResolveTransactionsFlowTest { lateinit var mockNet: MockNetwork - lateinit var a: MockNetwork.MockNode - lateinit var b: MockNetwork.MockNode + lateinit var a: StartedNode + lateinit var b: StartedNode lateinit var notary: Party val megaCorpServices = MockServices(MEGA_CORP_KEY) val notaryServices = MockServices(DUMMY_NOTARY_KEY) @@ -43,8 +44,8 @@ class ResolveTransactionsFlowTest { val nodes = mockNet.createSomeNodes() a = nodes.partyNodes[0] b = nodes.partyNodes[1] - a.registerInitiatedFlow(TestResponseFlow::class.java) - b.registerInitiatedFlow(TestResponseFlow::class.java) + a.internals.registerInitiatedFlow(TestResponseFlow::class.java) + b.internals.registerInitiatedFlow(TestResponseFlow::class.java) notary = nodes.notaryNode.info.notaryIdentity mockNet.runNetwork() } diff --git a/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt b/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt index ab38c1a545..49b415fe4e 100644 --- a/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt +++ b/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt @@ -9,12 +9,12 @@ import net.corda.core.flows.TestDataVendingFlow import net.corda.core.identity.Party import net.corda.core.internal.FetchAttachmentsFlow import net.corda.core.internal.FetchDataFlow -import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.ServiceInfo import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.internal.InitiatedFlowFactory +import net.corda.node.internal.StartedNode import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.network.NetworkMapService import net.corda.node.services.persistence.NodeAttachmentService @@ -42,10 +42,10 @@ private fun createAttachmentData(content: String) = ByteArrayOutputStream().appl private fun Attachment.extractContent() = ByteArrayOutputStream().apply { extractFile("content", this) }.toString(UTF_8.name()) -private fun MockNetwork.MockNode.saveAttachment(content: String) = database.transaction { +private fun StartedNode<*>.saveAttachment(content: String) = database.transaction { attachments.importAttachment(createAttachmentData(content).inputStream()) } -private fun MockNetwork.MockNode.hackAttachment(attachmentId: SecureHash, content: String) = database.transaction { +private fun StartedNode<*>.hackAttachment(attachmentId: SecureHash, content: String) = database.transaction { updateAttachment(attachmentId, createAttachmentData(content)) } @@ -63,17 +63,17 @@ private fun updateAttachment(attachmentId: SecureHash, data: ByteArray) { class AttachmentSerializationTest { private lateinit var mockNet: MockNetwork - private lateinit var server: MockNetwork.MockNode - private lateinit var client: MockNetwork.MockNode + private lateinit var server: StartedNode + private lateinit var client: StartedNode @Before fun setUp() { mockNet = MockNetwork() server = mockNet.createNode(advertisedServices = ServiceInfo(NetworkMapService.type)) client = mockNet.createNode(server.network.myAddress) - client.disableDBCloseOnStop() // Otherwise the in-memory database may disappear (taking the checkpoint with it) while we reboot the client. + client.internals.disableDBCloseOnStop() // Otherwise the in-memory database may disappear (taking the checkpoint with it) while we reboot the client. mockNet.runNetwork() - server.ensureRegistered() + server.internals.ensureRegistered() } @After @@ -95,7 +95,7 @@ class AttachmentSerializationTest { private class ClientResult(internal val attachmentContent: String) @InitiatingFlow - private abstract class ClientLogic(server: MockNetwork.MockNode) : FlowLogic() { + private abstract class ClientLogic(server: StartedNode<*>) : FlowLogic() { internal val server = server.info.legalIdentity @Suspendable @@ -116,7 +116,7 @@ class AttachmentSerializationTest { override val signers get() = throw UnsupportedOperationException() } - private class CustomAttachmentLogic(server: MockNetwork.MockNode, private val attachmentId: SecureHash, private val customContent: String) : ClientLogic(server) { + private class CustomAttachmentLogic(server: StartedNode<*>, private val attachmentId: SecureHash, private val customContent: String) : ClientLogic(server) { @Suspendable override fun getAttachmentContent(): String { val customAttachment = CustomAttachment(attachmentId, customContent) @@ -125,7 +125,7 @@ class AttachmentSerializationTest { } } - private class OpenAttachmentLogic(server: MockNetwork.MockNode, private val attachmentId: SecureHash) : ClientLogic(server) { + private class OpenAttachmentLogic(server: StartedNode<*>, private val attachmentId: SecureHash) : ClientLogic(server) { @Suspendable override fun getAttachmentContent(): String { val localAttachment = serviceHub.attachments.openAttachment(attachmentId)!! @@ -134,7 +134,7 @@ class AttachmentSerializationTest { } } - private class FetchAttachmentLogic(server: MockNetwork.MockNode, private val attachmentId: SecureHash) : ClientLogic(server) { + private class FetchAttachmentLogic(server: StartedNode<*>, private val attachmentId: SecureHash) : ClientLogic(server) { @Suspendable override fun getAttachmentContent(): String { val (downloadedAttachment) = subFlow(FetchAttachmentsFlow(setOf(attachmentId), server)).downloaded @@ -145,7 +145,7 @@ class AttachmentSerializationTest { } private fun launchFlow(clientLogic: ClientLogic, rounds: Int, sendData: Boolean = false) { - server.internalRegisterFlowFactory( + server.internals.internalRegisterFlowFactory( ClientLogic::class.java, InitiatedFlowFactory.Core { ServerLogic(it, sendData) }, ServerLogic::class.java, @@ -155,16 +155,13 @@ class AttachmentSerializationTest { } private fun rebootClientAndGetAttachmentContent(checkAttachmentsOnLoad: Boolean = true): String { - client.stop() - client = mockNet.createNode(server.network.myAddress, client.id, object : MockNetwork.Factory { + client.dispose() + client = mockNet.createNode(server.network.myAddress, client.internals.id, object : MockNetwork.Factory { override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, advertisedServices: Set, id: Int, overrideServices: Map?, entropyRoot: BigInteger): MockNetwork.MockNode { return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { - override fun startMessagingService(rpcOps: RPCOps) { - attachments.checkAttachmentsOnLoad = checkAttachmentsOnLoad - super.startMessagingService(rpcOps) - } + override fun start() = super.start().apply { attachments.checkAttachmentsOnLoad = checkAttachmentsOnLoad } } } }) diff --git a/docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt b/docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt index cc019543b9..2fd61f2065 100644 --- a/docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt +++ b/docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt @@ -7,6 +7,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.* import net.corda.finance.contracts.getCashBalances import net.corda.finance.flows.CashIssueFlow +import net.corda.node.internal.StartedNode import net.corda.finance.schemas.CashSchemaV1 import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.ValidatingNotaryService @@ -22,9 +23,9 @@ import java.util.* class CustomVaultQueryTest { lateinit var mockNet: MockNetwork - lateinit var notaryNode: MockNetwork.MockNode - lateinit var nodeA: MockNetwork.MockNode - lateinit var nodeB: MockNetwork.MockNode + lateinit var notaryNode: StartedNode + lateinit var nodeA: StartedNode + lateinit var nodeB: StartedNode @Before fun setup() { @@ -37,10 +38,10 @@ class CustomVaultQueryTest { nodeA = mockNet.createPartyNode(notaryNode.network.myAddress) nodeB = mockNet.createPartyNode(notaryNode.network.myAddress) - nodeA.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java) - nodeA.installCordaService(CustomVaultQuery.Service::class.java) - nodeA.registerCustomSchemas(setOf(CashSchemaV1)) - nodeB.registerCustomSchemas(setOf(CashSchemaV1)) + nodeA.internals.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java) + nodeA.internals.installCordaService(CustomVaultQuery.Service::class.java) + nodeA.internals.registerCustomSchemas(setOf(CashSchemaV1)) + nodeB.internals.registerCustomSchemas(setOf(CashSchemaV1)) } @After diff --git a/docs/source/example-code/src/test/kotlin/net/corda/docs/FxTransactionBuildTutorialTest.kt b/docs/source/example-code/src/test/kotlin/net/corda/docs/FxTransactionBuildTutorialTest.kt index 318974128b..66762eca6e 100644 --- a/docs/source/example-code/src/test/kotlin/net/corda/docs/FxTransactionBuildTutorialTest.kt +++ b/docs/source/example-code/src/test/kotlin/net/corda/docs/FxTransactionBuildTutorialTest.kt @@ -7,6 +7,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.* import net.corda.finance.contracts.getCashBalances import net.corda.finance.flows.CashIssueFlow +import net.corda.node.internal.StartedNode import net.corda.finance.schemas.CashSchemaV1 import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.ValidatingNotaryService @@ -20,9 +21,9 @@ import kotlin.test.assertEquals class FxTransactionBuildTutorialTest { lateinit var mockNet: MockNetwork - lateinit var notaryNode: MockNetwork.MockNode - lateinit var nodeA: MockNetwork.MockNode - lateinit var nodeB: MockNetwork.MockNode + lateinit var notaryNode: StartedNode + lateinit var nodeA: StartedNode + lateinit var nodeB: StartedNode @Before fun setup() { @@ -34,9 +35,9 @@ class FxTransactionBuildTutorialTest { advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), notaryService)) nodeA = mockNet.createPartyNode(notaryNode.network.myAddress) nodeB = mockNet.createPartyNode(notaryNode.network.myAddress) - nodeA.registerCustomSchemas(setOf(CashSchemaV1)) - nodeB.registerCustomSchemas(setOf(CashSchemaV1)) - nodeB.registerInitiatedFlow(ForeignExchangeRemoteFlow::class.java) + nodeA.internals.registerCustomSchemas(setOf(CashSchemaV1)) + nodeB.internals.registerCustomSchemas(setOf(CashSchemaV1)) + nodeB.internals.registerInitiatedFlow(ForeignExchangeRemoteFlow::class.java) } @After diff --git a/docs/source/example-code/src/test/kotlin/net/corda/docs/WorkflowTransactionBuildTutorialTest.kt b/docs/source/example-code/src/test/kotlin/net/corda/docs/WorkflowTransactionBuildTutorialTest.kt index 91bca876cc..2dc45f8639 100644 --- a/docs/source/example-code/src/test/kotlin/net/corda/docs/WorkflowTransactionBuildTutorialTest.kt +++ b/docs/source/example-code/src/test/kotlin/net/corda/docs/WorkflowTransactionBuildTutorialTest.kt @@ -9,6 +9,7 @@ import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.toFuture import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.StartedNode import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.testing.DUMMY_NOTARY @@ -21,9 +22,9 @@ import kotlin.test.assertEquals class WorkflowTransactionBuildTutorialTest { lateinit var mockNet: MockNetwork - lateinit var notaryNode: MockNetwork.MockNode - lateinit var nodeA: MockNetwork.MockNode - lateinit var nodeB: MockNetwork.MockNode + lateinit var notaryNode: StartedNode + lateinit var nodeA: StartedNode + lateinit var nodeB: StartedNode // Helper method to locate the latest Vault version of a LinearState private inline fun ServiceHub.latest(ref: UniqueIdentifier): StateAndRef { @@ -41,7 +42,7 @@ class WorkflowTransactionBuildTutorialTest { advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), notaryService)) nodeA = mockNet.createPartyNode(notaryNode.network.myAddress) nodeB = mockNet.createPartyNode(notaryNode.network.myAddress) - nodeA.registerInitiatedFlow(RecordCompletionFlow::class.java) + nodeA.internals.registerInitiatedFlow(RecordCompletionFlow::class.java) } @After diff --git a/finance/src/test/kotlin/net/corda/finance/flows/CashExitFlowTests.kt b/finance/src/test/kotlin/net/corda/finance/flows/CashExitFlowTests.kt index 454d7bcf22..9e4d56f02f 100644 --- a/finance/src/test/kotlin/net/corda/finance/flows/CashExitFlowTests.kt +++ b/finance/src/test/kotlin/net/corda/finance/flows/CashExitFlowTests.kt @@ -6,6 +6,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.DOLLARS import net.corda.finance.`issued by` import net.corda.finance.contracts.asset.Cash +import net.corda.node.internal.StartedNode import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork.MockNode @@ -19,9 +20,9 @@ class CashExitFlowTests { private val mockNet = MockNetwork(servicePeerAllocationStrategy = RoundRobin()) private val initialBalance = 2000.DOLLARS private val ref = OpaqueBytes.of(0x01) - private lateinit var bankOfCordaNode: MockNode + private lateinit var bankOfCordaNode: StartedNode private lateinit var bankOfCorda: Party - private lateinit var notaryNode: MockNode + private lateinit var notaryNode: StartedNode private lateinit var notary: Party @Before diff --git a/finance/src/test/kotlin/net/corda/finance/flows/CashIssueFlowTests.kt b/finance/src/test/kotlin/net/corda/finance/flows/CashIssueFlowTests.kt index 92ba4e8ae6..adbb4b0723 100644 --- a/finance/src/test/kotlin/net/corda/finance/flows/CashIssueFlowTests.kt +++ b/finance/src/test/kotlin/net/corda/finance/flows/CashIssueFlowTests.kt @@ -6,6 +6,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.DOLLARS import net.corda.finance.`issued by` import net.corda.finance.contracts.asset.Cash +import net.corda.node.internal.StartedNode import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork.MockNode @@ -17,9 +18,9 @@ import kotlin.test.assertFailsWith class CashIssueFlowTests { private val mockNet = MockNetwork(servicePeerAllocationStrategy = RoundRobin()) - private lateinit var bankOfCordaNode: MockNode + private lateinit var bankOfCordaNode: StartedNode private lateinit var bankOfCorda: Party - private lateinit var notaryNode: MockNode + private lateinit var notaryNode: StartedNode private lateinit var notary: Party @Before diff --git a/finance/src/test/kotlin/net/corda/finance/flows/CashPaymentFlowTests.kt b/finance/src/test/kotlin/net/corda/finance/flows/CashPaymentFlowTests.kt index 0e2171f603..600178dbf9 100644 --- a/finance/src/test/kotlin/net/corda/finance/flows/CashPaymentFlowTests.kt +++ b/finance/src/test/kotlin/net/corda/finance/flows/CashPaymentFlowTests.kt @@ -9,6 +9,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.DOLLARS import net.corda.finance.`issued by` import net.corda.finance.contracts.asset.Cash +import net.corda.node.internal.StartedNode import net.corda.testing.expect import net.corda.testing.expectEvents import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin @@ -24,9 +25,9 @@ class CashPaymentFlowTests { private val mockNet = MockNetwork(servicePeerAllocationStrategy = RoundRobin()) private val initialBalance = 2000.DOLLARS private val ref = OpaqueBytes.of(0x01) - private lateinit var bankOfCordaNode: MockNode + private lateinit var bankOfCordaNode: StartedNode private lateinit var bankOfCorda: Party - private lateinit var notaryNode: MockNode + private lateinit var notaryNode: StartedNode private lateinit var notary: Party @Before diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt index c651218abf..b08a75bff8 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt @@ -16,7 +16,7 @@ import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.Try import net.corda.core.utilities.getOrThrow -import net.corda.node.internal.AbstractNode +import net.corda.node.internal.StartedNode import net.corda.node.services.config.BFTSMaRtConfiguration import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.BFTNonValidatingNotaryService @@ -139,7 +139,7 @@ class BFTNotaryServiceTests { } } -private fun AbstractNode.signInitialTransaction( +private fun StartedNode<*>.signInitialTransaction( notary: Party, block: TransactionBuilder.() -> Any? ): SignedTransaction { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt index 45221d65c9..b5f4f3d768 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt @@ -11,7 +11,7 @@ import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.transpose import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow -import net.corda.node.internal.AbstractNode +import net.corda.node.internal.StartedNode import net.corda.testing.DUMMY_BANK_A import net.corda.testing.contracts.DUMMY_PROGRAM_ID import net.corda.testing.contracts.DummyContract @@ -58,7 +58,7 @@ class RaftNotaryServiceTests : NodeBasedTest() { assertEquals(error.txId, secondSpendTx.id) } - private fun issueState(node: AbstractNode, notary: Party): StateAndRef<*> { + private fun issueState(node: StartedNode<*>, notary: Party): StateAndRef<*> { return node.database.transaction { val builder = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0)) val stx = node.services.signInitialTransaction(builder) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt index 2dafbab440..1b271eae18 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt @@ -19,7 +19,7 @@ class FlowVersioningTest : NodeBasedTest() { val (alice, bob) = listOf( startNode(ALICE.name, platformVersion = 2), startNode(BOB.name, platformVersion = 3)).transpose().getOrThrow() - bob.installCoreFlow(PretendInitiatingCoreFlow::class, ::PretendInitiatedCoreFlow) + bob.internals.installCoreFlow(PretendInitiatingCoreFlow::class, ::PretendInitiatedCoreFlow) val (alicePlatformVersionAccordingToBob, bobPlatformVersionAccordingToAlice) = alice.services.startFlow( PretendInitiatingCoreFlow(bob.info.legalIdentity)).resultFuture.getOrThrow() assertThat(alicePlatformVersionAccordingToBob).isEqualTo(2) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt index b5a42ecfba..b885c3129d 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt @@ -28,7 +28,7 @@ import java.nio.file.Files */ class MQSecurityAsNodeTest : MQSecurityTest() { override fun createAttacker(): SimpleMQClient { - return clientTo(alice.configuration.p2pAddress) + return clientTo(alice.internals.configuration.p2pAddress) } override fun startAttacker(attacker: SimpleMQClient) { @@ -42,7 +42,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() { @Test fun `only the node running the broker can login using the special node user`() { - val attacker = clientTo(alice.configuration.p2pAddress) + val attacker = clientTo(alice.internals.configuration.p2pAddress) assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { attacker.start(NODE_USER, NODE_USER) } @@ -50,7 +50,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() { @Test fun `login as the default cluster user`() { - val attacker = clientTo(alice.configuration.p2pAddress) + val attacker = clientTo(alice.internals.configuration.p2pAddress) assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy { attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword()) } @@ -58,7 +58,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() { @Test fun `login without a username and password`() { - val attacker = clientTo(alice.configuration.p2pAddress) + val attacker = clientTo(alice.internals.configuration.p2pAddress) assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { attacker.start() } @@ -66,7 +66,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() { @Test fun `login to a non ssl port as a node user`() { - val attacker = clientTo(alice.configuration.rpcAddress!!, sslConfiguration = null) + val attacker = clientTo(alice.internals.configuration.rpcAddress!!, sslConfiguration = null) assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { attacker.start(NODE_USER, NODE_USER, enableSSL = false) } @@ -74,7 +74,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() { @Test fun `login to a non ssl port as a peer user`() { - val attacker = clientTo(alice.configuration.rpcAddress!!, sslConfiguration = null) + val attacker = clientTo(alice.internals.configuration.rpcAddress!!, sslConfiguration = null) assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { attacker.start(PEER_USER, PEER_USER, enableSSL = false) // Login as a peer } @@ -128,7 +128,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() { } } - val attacker = clientTo(alice.configuration.p2pAddress, sslConfig) + val attacker = clientTo(alice.internals.configuration.p2pAddress, sslConfig) assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy { attacker.start(PEER_USER, PEER_USER) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsRPCTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsRPCTest.kt index ca458158ed..5ea185cf7b 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsRPCTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsRPCTest.kt @@ -12,7 +12,7 @@ import org.junit.Test */ class MQSecurityAsRPCTest : MQSecurityTest() { override fun createAttacker(): SimpleMQClient { - return clientTo(alice.configuration.rpcAddress!!) + return clientTo(alice.internals.configuration.rpcAddress!!) } @Test @@ -30,7 +30,7 @@ class MQSecurityAsRPCTest : MQSecurityTest() { @Test fun `login to a ssl port as a RPC user`() { assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { - loginToRPC(alice.configuration.p2pAddress, extraRPCUsers[0], configureTestSSL()) + loginToRPC(alice.internals.configuration.p2pAddress, extraRPCUsers[0], configureTestSSL()) } } } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index af40eb7adf..675f6577ec 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -16,6 +16,7 @@ import net.corda.testing.ALICE import net.corda.testing.BOB import net.corda.core.utilities.unwrap import net.corda.node.internal.Node +import net.corda.node.internal.StartedNode import net.corda.nodeapi.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS @@ -43,7 +44,7 @@ import kotlin.test.assertEquals */ abstract class MQSecurityTest : NodeBasedTest() { val rpcUser = User("user1", "pass", permissions = emptySet()) - lateinit var alice: Node + lateinit var alice: StartedNode lateinit var attacker: SimpleMQClient private val clients = ArrayList() @@ -155,9 +156,9 @@ abstract class MQSecurityTest : NodeBasedTest() { } fun loginToRPCAndGetClientQueue(): String { - loginToRPC(alice.configuration.rpcAddress!!, rpcUser) + loginToRPC(alice.internals.configuration.rpcAddress!!, rpcUser) val clientQueueQuery = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*") - val client = clientTo(alice.configuration.rpcAddress!!) + val client = clientTo(alice.internals.configuration.rpcAddress!!) client.start(rpcUser.username, rpcUser.password, false) return client.session.addressQuery(clientQueueQuery).queueNames.single().toString() } @@ -217,7 +218,7 @@ abstract class MQSecurityTest : NodeBasedTest() { private fun startBobAndCommunicateWithAlice(): Party { val bob = startNode(BOB.name).getOrThrow() - bob.registerInitiatedFlow(ReceiveFlow::class.java) + bob.internals.registerInitiatedFlow(ReceiveFlow::class.java) val bobParty = bob.info.legalIdentity // Perform a protocol exchange to force the peer queue to be created alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow() diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index b10d05f4e4..3cdbfdbdc0 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -14,7 +14,7 @@ import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds -import net.corda.node.internal.Node +import net.corda.node.internal.StartedNode import net.corda.node.services.messaging.* import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.SimpleNotaryService @@ -149,7 +149,7 @@ class P2PMessagingTest : NodeBasedTest() { // Wait until the first request is received crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) // Stop alice's node after we ensured that the first request was delivered and ignored. - alice.stop() + alice.dispose() val numberOfRequestsReceived = crashingNodes.requestsReceived.get() assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1) @@ -174,7 +174,7 @@ class P2PMessagingTest : NodeBasedTest() { * either ignore them or respond, depending on the value of [CrashingNodes.ignoreRequests], initially set to true. * This may be used to simulate scenarios where nodes receive request messages but crash before sending back a response. */ - private fun simulateCrashingNodes(distributedServiceNodes: List, dummyTopic: String, responseMessage: String): CrashingNodes { + private fun simulateCrashingNodes(distributedServiceNodes: List>, dummyTopic: String, responseMessage: String): CrashingNodes { val crashingNodes = CrashingNodes( requestsReceived = AtomicInteger(0), firstRequestReceived = CountDownLatch(1), @@ -203,7 +203,7 @@ class P2PMessagingTest : NodeBasedTest() { return crashingNodes } - private fun assertAllNodesAreUsed(participatingServiceNodes: List, serviceName: CordaX500Name, originatingNode: Node) { + private fun assertAllNodesAreUsed(participatingServiceNodes: List>, serviceName: CordaX500Name, originatingNode: StartedNode<*>) { // Setup each node in the distributed service to return back it's NodeInfo so that we can know which node is being used participatingServiceNodes.forEach { node -> node.respondWith(node.info) @@ -221,10 +221,10 @@ class P2PMessagingTest : NodeBasedTest() { break } } - assertThat(participatingNodes).containsOnlyElementsOf(participatingServiceNodes.map(Node::info)) + assertThat(participatingNodes).containsOnlyElementsOf(participatingServiceNodes.map(StartedNode<*>::info)) } - private fun Node.respondWith(message: Any) { + private fun StartedNode<*>.respondWith(message: Any) { network.addMessageHandler(javaClass.name) { netMessage, _ -> val request = netMessage.data.deserialize() val response = network.createMessage(javaClass.name, request.sessionID, message.serialize().bytes) @@ -232,7 +232,7 @@ class P2PMessagingTest : NodeBasedTest() { } } - private fun Node.receiveFrom(target: MessageRecipients): CordaFuture { + private fun StartedNode<*>.receiveFrom(target: MessageRecipients): CordaFuture { val request = TestRequest(replyTo = network.myAddress) return network.sendRequest(javaClass.name, request, target) } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt index 4e5d08c746..a3ba2195ac 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -33,7 +33,7 @@ class P2PSecurityTest : NodeBasedTest() { val incorrectNetworkMapName = getX500Name(O = "NetworkMap-${random63BitValue()}", L = "London", C = "GB") val node = startNode(BOB.name, configOverrides = mapOf( "networkMapService" to mapOf( - "address" to networkMapNode.configuration.p2pAddress.toString(), + "address" to networkMapNode.internals.configuration.p2pAddress.toString(), "legalName" to incorrectNetworkMapName.toString() ) )) @@ -59,7 +59,7 @@ class P2PSecurityTest : NodeBasedTest() { val config = testNodeConfiguration( baseDirectory = baseDirectory(legalName.x500Name), myLegalName = legalName).also { - whenever(it.networkMapService).thenReturn(NetworkMapInfo(networkMapNode.configuration.p2pAddress, networkMapNode.info.legalIdentity.name)) + whenever(it.networkMapService).thenReturn(NetworkMapInfo(networkMapNode.internals.configuration.p2pAddress, networkMapNode.info.legalIdentity.name)) } config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name return SimpleNode(config, trustRoot = trustRoot).apply { start() } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 393c3ce986..8c5f7d5521 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -45,12 +45,9 @@ import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.sendRequest -import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.network.* import net.corda.node.services.network.NetworkMapService.RegistrationRequest import net.corda.node.services.network.NetworkMapService.RegistrationResponse -import net.corda.node.services.network.NodeRegistration -import net.corda.node.services.network.PersistentNetworkMapCache -import net.corda.node.services.network.PersistentNetworkMapService import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionStorage @@ -104,6 +101,18 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val advertisedServices: Set, val platformClock: Clock, @VisibleForTesting val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() { + private class StartedNodeImpl( + override val internals: N, + override val services: ServiceHubInternalImpl, + override val info: NodeInfo, + override val checkpointStorage: CheckpointStorage, + override val smm: StateMachineManager, + override val attachments: NodeAttachmentService, + override val inNodeNetworkMapService: NetworkMapService, + override val network: MessagingService, + override val database: CordaPersistence, + override val rpcOps: CordaRPCOps) : StartedNode + // TODO: Persist this, as well as whether the node is registered. /** * Sequence number of changes sent to the network map service, when registering/de-registering this node. @@ -122,17 +131,16 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, private val flowFactories = ConcurrentHashMap>, InitiatedFlowFactory<*>>() protected val partyKeys = mutableSetOf() - val services: ServiceHubInternal get() = _services - + protected val services: ServiceHubInternal get() = _services private lateinit var _services: ServiceHubInternalImpl - lateinit var info: NodeInfo - lateinit var checkpointStorage: CheckpointStorage - lateinit var smm: StateMachineManager - lateinit var attachments: NodeAttachmentService - var inNodeNetworkMapService: NetworkMapService? = null - lateinit var network: MessagingService + protected lateinit var info: NodeInfo + protected lateinit var checkpointStorage: CheckpointStorage + protected lateinit var smm: StateMachineManager + protected lateinit var attachments: NodeAttachmentService + protected lateinit var inNodeNetworkMapService: NetworkMapService + protected lateinit var network: MessagingService protected val runOnStop = ArrayList<() -> Any?>() - lateinit var database: CordaPersistence + protected lateinit var database: CordaPersistence protected var dbCloser: (() -> Any?)? = null protected val _nodeReadyFuture = openFuture() @@ -160,17 +168,17 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, cordappLoader.cordapps.flatMap { it.plugins } + DefaultWhitelist() } - /** Set to true once [start] has been successfully called. */ - @Volatile - var started = false - private set + /** Set to non-null once [start] has been successfully called. */ + open val started get() = _started + @Volatile private var _started: StartedNode? = null /** The implementation of the [CordaRPCOps] interface used by this node. */ - open val rpcOps: CordaRPCOps by lazy { CordaRPCOpsImpl(services, smm, database) } // Lazy to avoid init ordering issue with the SMM. - - open fun start() { - require(!started) { "Node has already been started" } + open fun makeRPCOps(): CordaRPCOps { + return CordaRPCOpsImpl(services, smm, database) + } + open fun start(): StartedNode { + require(started == null) { "Node has already been started" } if (configuration.devMode) { log.warn("Corda node is running in dev mode.") configuration.configureWithDevSSLCertificate() @@ -180,7 +188,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, log.info("Node starting up ...") // Do all of this in a database transaction so anything that might need a connection has one. - initialiseDatabasePersistence { + val startedImpl = initialiseDatabasePersistence { val tokenizableServices = makeServices() smm = StateMachineManager(services, @@ -202,6 +210,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, makeVaultObservers() + val rpcOps = makeRPCOps() startMessagingService(rpcOps) installCoreFlows() @@ -211,16 +220,19 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, registerCustomSchemas(cordappLoader.cordapps.flatMap { it.customSchemas }.toSet()) runOnStop += network::stop + StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, inNodeNetworkMapService, network, database, rpcOps) } // If we successfully loaded network data from database, we set this future to Unit. _nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured()) - database.transaction { - smm.start() - // Shut down the SMM so no Fibers are scheduled. - runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } - _services.schedulerService.start() + return startedImpl.apply { + database.transaction { + smm.start() + // Shut down the SMM so no Fibers are scheduled. + runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } + services.schedulerService.start() + } + _started = this } - started = true } private class ServiceInstantiationException(cause: Throwable?) : Exception(cause) @@ -409,7 +421,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, // Specific class so that MockNode can catch it. class DatabaseConfigurationException(msg: String) : Exception(msg) - protected open fun initialiseDatabasePersistence(insideTransaction: () -> Unit) { + protected open fun initialiseDatabasePersistence(insideTransaction: () -> T): T { val props = configuration.dataSourceProperties if (props.isNotEmpty()) { this.database = configureDatabase(props, configuration.database, { _services.schemaService }, createIdentityService = { _services.identityService }) @@ -421,7 +433,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, dbCloser = it runOnStop += it } - database.transaction { + return database.transaction { insideTransaction() } } else { @@ -431,7 +443,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, private fun makeAdvertisedServices(tokenizableServices: MutableList) { val serviceTypes = info.advertisedServices.map { it.info.type } - if (NetworkMapService.type in serviceTypes) makeNetworkMapService() + inNodeNetworkMapService = if (NetworkMapService.type in serviceTypes) makeNetworkMapService() else NullNetworkMapService val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() } if (notaryServiceType != null) { val service = makeCoreNotaryService(notaryServiceType) @@ -452,7 +464,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, private fun registerWithNetworkMapIfConfigured(): CordaFuture { services.networkMapCache.addNode(info) // In the unit test environment, we may sometimes run without any network map service - return if (networkMapAddress == null && inNodeNetworkMapService == null) { + return if (networkMapAddress == null && inNodeNetworkMapService == NullNetworkMapService) { services.networkMapCache.runWithoutMapService() noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache } else { @@ -513,8 +525,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, return PersistentKeyManagementService(identityService, partyKeys) } - open protected fun makeNetworkMapService() { - inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion) + open protected fun makeNetworkMapService(): NetworkMapService { + return PersistentNetworkMapService(services, configuration.minimumPlatformVersion) } open protected fun makeCoreNotaryService(type: ServiceType): NotaryService? { diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 9c69148450..bf3fba166e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -8,6 +8,7 @@ import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.thenMatch +import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.RPCOps import net.corda.core.node.ServiceHub import net.corda.core.node.services.ServiceInfo @@ -279,7 +280,7 @@ open class Node(override val configuration: FullNodeConfiguration, * This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details * on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url */ - override fun initialiseDatabasePersistence(insideTransaction: () -> Unit) { + override fun initialiseDatabasePersistence(insideTransaction: () -> T): T { val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url") val h2Prefix = "jdbc:h2:file:" if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) { @@ -296,25 +297,24 @@ open class Node(override val configuration: FullNodeConfiguration, printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node") } } - super.initialiseDatabasePersistence(insideTransaction) + return super.initialiseDatabasePersistence(insideTransaction) } private val _startupComplete = openFuture() val startupComplete: CordaFuture get() = _startupComplete - override fun start() { + override fun start(): StartedNode { if (initialiseSerialization) { initialiseSerialization() } - super.start() - + val started: StartedNode = uncheckedCast(super.start()) nodeReadyFuture.thenMatch({ serverThread.execute { // Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia: // // https://jolokia.org/agent/jvm.html JmxReporter. - forRegistry(services.monitoringService.metrics). + forRegistry(started.services.monitoringService.metrics). inDomain("net.corda"). createsObjectNamesWith { _, domain, name -> // Make the JMX hierarchy a bit better organised. @@ -336,6 +336,7 @@ open class Node(override val configuration: FullNodeConfiguration, shutdownHook = addShutdownHook { stop() } + return started } private fun initialiseSerialization() { diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index 88f67b0b5a..4fdd45d9cb 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -92,18 +92,16 @@ open class NodeStartup(val args: Array) { open protected fun startNode(conf: FullNodeConfiguration, versionInfo: VersionInfo, startTime: Long, cmdlineOptions: CmdLineOptions) { val advertisedServices = conf.calculateServices() - val node = createNode(conf, versionInfo, advertisedServices) - node.start() - printPluginsAndServices(node) - - node.nodeReadyFuture.thenMatch({ + val node = createNode(conf, versionInfo, advertisedServices).start() + printPluginsAndServices(node.internals) + node.internals.nodeReadyFuture.thenMatch({ val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0 val name = node.info.legalIdentity.name.organisation Node.printBasicNodeInfo("Node for \"$name\" started up and registered in $elapsed sec") // Don't start the shell if there's no console attached. val runShell = !cmdlineOptions.noLocalShell && System.console() != null - node.startupComplete.then { + node.internals.startupComplete.then { try { InteractiveShell.startShell(cmdlineOptions.baseDirectory, runShell, cmdlineOptions.sshdServer, node) } catch(e: Throwable) { @@ -114,7 +112,7 @@ open class NodeStartup(val args: Array) { { th -> logger.error("Unexpected exception during registration", th) }) - node.run() + node.internals.run() } open protected fun logStartupInfo(versionInfo: VersionInfo, cmdlineOptions: CmdLineOptions, conf: FullNodeConfiguration) { diff --git a/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt b/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt new file mode 100644 index 0000000000..51f2ae7685 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt @@ -0,0 +1,25 @@ +package net.corda.node.internal + +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.node.NodeInfo +import net.corda.node.services.api.CheckpointStorage +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.statemachine.StateMachineManager +import net.corda.node.utilities.CordaPersistence + +interface StartedNode { + val internals: N + val services: ServiceHubInternal + val info: NodeInfo + val checkpointStorage: CheckpointStorage + val smm: StateMachineManager + val attachments: NodeAttachmentService + val inNodeNetworkMapService: NetworkMapService + val network: MessagingService + val database: CordaPersistence + val rpcOps: CordaRPCOps + fun dispose() = internals.stop() +} diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt index 18352367fe..f7c4da9b73 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt @@ -114,6 +114,8 @@ interface NetworkMapService { data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients) } +object NullNetworkMapService : NetworkMapService + @ThreadSafe class InMemoryNetworkMapService(services: ServiceHubInternal, minimumPlatformVersion: Int) : AbstractNetworkMapService(services, minimumPlatformVersion) { diff --git a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt index cc863b3193..27d790e8f1 100644 --- a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt +++ b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt @@ -24,6 +24,7 @@ import net.corda.core.utilities.loggerFor import net.corda.client.jackson.JacksonSupport import net.corda.client.jackson.StringToMethodCallParser import net.corda.node.internal.Node +import net.corda.node.internal.StartedNode import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT import net.corda.node.services.messaging.RpcContext import net.corda.node.services.statemachine.FlowStateMachineImpl @@ -75,13 +76,13 @@ import kotlin.concurrent.thread object InteractiveShell { private val log = loggerFor() - private lateinit var node: Node + private lateinit var node: StartedNode /** * Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node * internals. */ - fun startShell(dir: Path, runLocalShell: Boolean, runSSHServer: Boolean, node: Node) { + fun startShell(dir: Path, runLocalShell: Boolean, runSSHServer: Boolean, node: StartedNode) { this.node = node var runSSH = runSSHServer @@ -136,7 +137,7 @@ object InteractiveShell { jlineProcessor.closed() log.info("Command shell has exited") terminal.restore() - node.stop() + node.dispose() } } @@ -168,7 +169,7 @@ object InteractiveShell { } } val attributes = mapOf( - "node" to node, + "node" to node.internals, "services" to node.services, "ops" to node.rpcOps, "mapper" to yamlInputMapper diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 1fd26ec47c..2ea61fbf5a 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -23,6 +23,7 @@ import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow import net.corda.node.internal.CordaRPCOpsImpl +import net.corda.node.internal.StartedNode import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT import net.corda.node.services.messaging.RpcContext import net.corda.node.services.network.NetworkMapService @@ -55,8 +56,8 @@ class CordaRPCOpsImplTest { } lateinit var mockNet: MockNetwork - lateinit var aliceNode: MockNode - lateinit var notaryNode: MockNode + lateinit var aliceNode: StartedNode + lateinit var notaryNode: StartedNode lateinit var rpc: CordaRPCOps lateinit var stateMachineUpdates: Observable lateinit var transactions: Observable @@ -75,7 +76,7 @@ class CordaRPCOpsImplTest { )))) mockNet.runNetwork() - networkMap.ensureRegistered() + networkMap.internals.ensureRegistered() } @After diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 794c1793c9..b0bb9c7b41 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -36,7 +36,7 @@ import net.corda.finance.contracts.CommercialPaper import net.corda.finance.contracts.asset.* import net.corda.finance.flows.TwoPartyTradeFlow.Buyer import net.corda.finance.flows.TwoPartyTradeFlow.Seller -import net.corda.node.internal.AbstractNode +import net.corda.node.internal.StartedNode import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.persistence.DBTransactionStorage @@ -46,6 +46,7 @@ import net.corda.testing.* import net.corda.testing.contracts.fillWithSomeTestCash import net.corda.testing.node.InMemoryMessagingNetwork import net.corda.testing.node.MockNetwork +import net.corda.testing.node.pumpReceive import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before @@ -98,8 +99,8 @@ class TwoPartyTradeFlowTests { val cashIssuer = bankNode.info.legalIdentity.ref(1) val cpIssuer = bankNode.info.legalIdentity.ref(1, 2, 3) - aliceNode.disableDBCloseOnStop() - bobNode.disableDBCloseOnStop() + aliceNode.internals.disableDBCloseOnStop() + bobNode.internals.disableDBCloseOnStop() bobNode.database.transaction { bobNode.services.fillWithSomeTestCash(2000.DOLLARS, bankNode.services, outputNotary = notaryNode.info.notaryIdentity, @@ -120,17 +121,17 @@ class TwoPartyTradeFlowTests { // assertEquals(bobResult.get(), aliceNode.storage.validatedTransactions[aliceResult.get().id]) assertEquals(aliceResult.getOrThrow(), bobStateMachine.getOrThrow().resultFuture.getOrThrow()) - aliceNode.stop() - bobNode.stop() + aliceNode.dispose() + bobNode.dispose() aliceNode.database.transaction { assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty() } - aliceNode.manuallyCloseDB() + aliceNode.internals.manuallyCloseDB() bobNode.database.transaction { assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty() } - bobNode.manuallyCloseDB() + bobNode.internals.manuallyCloseDB() } } @@ -145,8 +146,8 @@ class TwoPartyTradeFlowTests { val bankNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOC.name) val issuer = bankNode.info.legalIdentity.ref(1) - aliceNode.disableDBCloseOnStop() - bobNode.disableDBCloseOnStop() + aliceNode.internals.disableDBCloseOnStop() + bobNode.internals.disableDBCloseOnStop() val cashStates = bobNode.database.transaction { bobNode.services.fillWithSomeTestCash(2000.DOLLARS, bankNode.services, notaryNode.info.notaryIdentity, 3, 3, @@ -174,17 +175,17 @@ class TwoPartyTradeFlowTests { assertEquals(aliceResult.getOrThrow(), bobStateMachine.getOrThrow().resultFuture.getOrThrow()) - aliceNode.stop() - bobNode.stop() + aliceNode.dispose() + bobNode.dispose() aliceNode.database.transaction { assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty() } - aliceNode.manuallyCloseDB() + aliceNode.internals.manuallyCloseDB() bobNode.database.transaction { assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty() } - bobNode.manuallyCloseDB() + bobNode.internals.manuallyCloseDB() } } @@ -212,8 +213,8 @@ class TwoPartyTradeFlowTests { bobNode.database.transaction { bobNode.services.identityService.verifyAndRegisterIdentity(aliceNode.info.legalIdentityAndCert) } - aliceNode.disableDBCloseOnStop() - bobNode.disableDBCloseOnStop() + aliceNode.internals.disableDBCloseOnStop() + bobNode.internals.disableDBCloseOnStop() val bobAddr = bobNode.network.myAddress as InMemoryMessagingNetwork.PeerHandle val networkMapAddress = notaryNode.network.myAddress @@ -255,7 +256,7 @@ class TwoPartyTradeFlowTests { assertThat(bobTransactionsBeforeCrash).isNotEmpty // .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk. - bobNode.stop() + bobNode.dispose() // Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use. // She will wait around until Bob comes back. @@ -272,7 +273,7 @@ class TwoPartyTradeFlowTests { entropyRoot: BigInteger): MockNetwork.MockNode { return MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, bobAddr.id, overrideServices, entropyRoot) } - }, true, BOB.name) + }, BOB.name) // Find the future representing the result of this state machine again. val bobFuture = bobNode.smm.findStateMachines(BuyerAcceptor::class.java).single().second @@ -298,8 +299,8 @@ class TwoPartyTradeFlowTests { assertThat(restoredBobTransactions).containsAll(bobTransactionsBeforeCrash) } - aliceNode.manuallyCloseDB() - bobNode.manuallyCloseDB() + aliceNode.internals.manuallyCloseDB() + bobNode.internals.manuallyCloseDB() } } @@ -307,7 +308,7 @@ class TwoPartyTradeFlowTests { // of gets and puts. private fun makeNodeWithTracking( networkMapAddress: SingleMessageRecipient?, - name: CordaX500Name): MockNetwork.MockNode { + name: CordaX500Name): StartedNode { // Create a node in the mock network ... return mockNet.createNode(networkMapAddress, nodeFactory = object : MockNetwork.Factory { override fun create(config: NodeConfiguration, @@ -337,7 +338,7 @@ class TwoPartyTradeFlowTests { val issuer = bankNode.info.legalIdentity.ref(1, 2, 3) mockNet.runNetwork() - notaryNode.ensureRegistered() + notaryNode.internals.ensureRegistered() val allNodes = listOf(notaryNode, aliceNode, bobNode, bankNode) allNodes.forEach { node -> @@ -448,7 +449,7 @@ class TwoPartyTradeFlowTests { val issuer = bankNode.info.legalIdentity.ref(1, 2, 3) mockNet.runNetwork() - notaryNode.ensureRegistered() + notaryNode.internals.ensureRegistered() val allNodes = listOf(notaryNode, aliceNode, bobNode, bankNode) allNodes.forEach { node -> @@ -548,12 +549,12 @@ class TwoPartyTradeFlowTests { val sellerId: StateMachineRunId ) - private fun runBuyerAndSeller(notaryNode: MockNetwork.MockNode, - sellerNode: MockNetwork.MockNode, - buyerNode: MockNetwork.MockNode, + private fun runBuyerAndSeller(notaryNode: StartedNode, + sellerNode: StartedNode, + buyerNode: StartedNode, assetToSell: StateAndRef, anonymous: Boolean = true): RunResult { - val buyerFlows: Observable> = buyerNode.registerInitiatedFlow(BuyerAcceptor::class.java) + val buyerFlows: Observable> = buyerNode.internals.registerInitiatedFlow(BuyerAcceptor::class.java) val firstBuyerFiber = buyerFlows.toFuture().map { it.stateMachine } val seller = SellerInitiator(buyerNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, anonymous) val sellerResult = sellerNode.services.startFlow(seller).resultFuture @@ -610,7 +611,7 @@ class TwoPartyTradeFlowTests { val issuer = bankNode.info.legalIdentity.ref(1, 2, 3) mockNet.runNetwork() - notaryNode.ensureRegistered() + notaryNode.internals.ensureRegistered() // Let the nodes know about each other - normally the network map would handle this val allNodes = listOf(notaryNode, aliceNode, bobNode, bankNode) @@ -653,9 +654,9 @@ class TwoPartyTradeFlowTests { private fun insertFakeTransactions( wtxToSign: List, - node: AbstractNode, - notaryNode: AbstractNode, - vararg extraSigningNodes: AbstractNode): Map { + node: StartedNode<*>, + notaryNode: StartedNode<*>, + vararg extraSigningNodes: StartedNode<*>): Map { val signed = wtxToSign.map { val id = it.id diff --git a/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt b/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt index f1710701d0..91726bd62d 100644 --- a/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt @@ -11,7 +11,7 @@ import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds -import net.corda.node.internal.AbstractNode +import net.corda.node.internal.StartedNode import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.SimpleNotaryService import net.corda.testing.DUMMY_NOTARY @@ -31,10 +31,10 @@ import kotlin.test.assertTrue class NotaryChangeTests { lateinit var mockNet: MockNetwork - lateinit var oldNotaryNode: MockNetwork.MockNode - lateinit var newNotaryNode: MockNetwork.MockNode - lateinit var clientNodeA: MockNetwork.MockNode - lateinit var clientNodeB: MockNetwork.MockNode + lateinit var oldNotaryNode: StartedNode + lateinit var newNotaryNode: StartedNode + lateinit var clientNodeA: StartedNode + lateinit var clientNodeB: StartedNode @Before fun setUp() { @@ -47,7 +47,7 @@ class NotaryChangeTests { newNotaryNode = mockNet.createNode(networkMapAddress = oldNotaryNode.network.myAddress, advertisedServices = ServiceInfo(SimpleNotaryService.type)) mockNet.runNetwork() // Clear network map registration messages - oldNotaryNode.ensureRegistered() + oldNotaryNode.internals.ensureRegistered() } @After @@ -132,7 +132,7 @@ class NotaryChangeTests { } } - private fun issueEncumberedState(node: AbstractNode, notaryNode: AbstractNode): WireTransaction { + private fun issueEncumberedState(node: StartedNode<*>, notaryNode: StartedNode<*>): WireTransaction { val owner = node.info.legalIdentity.ref(0) val notary = notaryNode.info.notaryIdentity @@ -160,7 +160,7 @@ class NotaryChangeTests { // - The transaction type is not a notary change transaction at all. } -fun issueState(node: AbstractNode, notaryNode: AbstractNode): StateAndRef<*> { +fun issueState(node: StartedNode<*>, notaryNode: StartedNode<*>): StateAndRef<*> { val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0)) val signedByNode = node.services.signInitialTransaction(tx) val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey) @@ -168,7 +168,7 @@ fun issueState(node: AbstractNode, notaryNode: AbstractNode): StateAndRef<*> { return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0)) } -fun issueMultiPartyState(nodeA: AbstractNode, nodeB: AbstractNode, notaryNode: AbstractNode): StateAndRef { +fun issueMultiPartyState(nodeA: StartedNode<*>, nodeB: StartedNode<*>, notaryNode: StartedNode<*>): StateAndRef { val state = TransactionState(DummyContract.MultiOwnerState(0, listOf(nodeA.info.legalIdentity, nodeB.info.legalIdentity)), DUMMY_PROGRAM_ID, notaryNode.info.notaryIdentity) val tx = TransactionBuilder(notary = notaryNode.info.notaryIdentity).withItems(state, dummyCommand()) @@ -181,7 +181,7 @@ fun issueMultiPartyState(nodeA: AbstractNode, nodeB: AbstractNode, notaryNode: A return stateAndRef } -fun issueInvalidState(node: AbstractNode, notary: Party): StateAndRef<*> { +fun issueInvalidState(node: StartedNode<*>, notary: Party): StateAndRef<*> { val tx = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0)) tx.setTimeWindow(Instant.now(), 30.seconds) val stx = node.services.signInitialTransaction(tx) diff --git a/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt index e236a24122..1e9ce8ecdf 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt @@ -16,12 +16,12 @@ import net.corda.core.node.services.vault.Sort import net.corda.core.node.services.vault.SortAttribute import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.StartedNode import net.corda.node.services.network.NetworkMapService import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.testing.DUMMY_NOTARY import net.corda.testing.contracts.DUMMY_PROGRAM_ID -import net.corda.testing.contracts.DummyContract import net.corda.testing.dummyCommand import net.corda.testing.node.MockNetwork import org.junit.After @@ -37,9 +37,9 @@ class ScheduledFlowTests { val SORTING = Sort(listOf(Sort.SortColumn(SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID), Sort.Direction.DESC))) } lateinit var mockNet: MockNetwork - lateinit var notaryNode: MockNetwork.MockNode - lateinit var nodeA: MockNetwork.MockNode - lateinit var nodeB: MockNetwork.MockNode + lateinit var notaryNode: StartedNode + lateinit var nodeA: StartedNode + lateinit var nodeB: StartedNode data class ScheduledState(val creationTime: Instant, val source: Party, @@ -101,12 +101,14 @@ class ScheduledFlowTests { notaryNode = mockNet.createNode( legalName = DUMMY_NOTARY.name, advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type))) - nodeA = mockNet.createNode(notaryNode.network.myAddress, start = false) - nodeB = mockNet.createNode(notaryNode.network.myAddress, start = false) + val a = mockNet.createUnstartedNode(notaryNode.network.myAddress) + val b = mockNet.createUnstartedNode(notaryNode.network.myAddress) - notaryNode.ensureRegistered() + notaryNode.internals.ensureRegistered() mockNet.startNodes() + nodeA = a.started!! + nodeB = b.started!! } @After diff --git a/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt index ab80752849..683f0f8571 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt @@ -7,6 +7,8 @@ import net.corda.core.node.NodeInfo import net.corda.core.node.services.ServiceInfo import net.corda.core.serialization.deserialize import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.StartedNode +import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.send @@ -42,8 +44,8 @@ import java.util.concurrent.LinkedBlockingQueue abstract class AbstractNetworkMapServiceTest { lateinit var mockNet: MockNetwork - lateinit var mapServiceNode: MockNode - lateinit var alice: MockNode + lateinit var mapServiceNode: StartedNode + lateinit var alice: StartedNode companion object { val subscriberLegalName = CordaX500Name(organisation ="Subscriber", locality ="New York", country ="US") @@ -188,7 +190,7 @@ abstract class AbstractNetworkMapServiceTest assertThat(updates.last().wireReg.verified().serial).isEqualTo(serial) } - private fun MockNode.fetchMap(subscribe: Boolean = false, ifChangedSinceVersion: Int? = null): List { + private fun StartedNode<*>.fetchMap(subscribe: Boolean = false, ifChangedSinceVersion: Int? = null): List { val request = FetchMapRequest(subscribe, ifChangedSinceVersion, network.myAddress) val response = services.networkService.sendRequest(FETCH_TOPIC, request, mapServiceNode.network.myAddress) mockNet.runNetwork() @@ -200,7 +202,7 @@ abstract class AbstractNetworkMapServiceTest REMOVE -> Removed(node) } - private fun MockNode.identityQuery(): NodeInfo? { + private fun StartedNode<*>.identityQuery(): NodeInfo? { val request = QueryIdentityRequest(info.legalIdentityAndCert, network.myAddress) val response = services.networkService.sendRequest(QUERY_TOPIC, request, mapServiceNode.network.myAddress) mockNet.runNetwork() @@ -209,7 +211,7 @@ abstract class AbstractNetworkMapServiceTest private var lastSerial = Long.MIN_VALUE - private fun MockNode.registration(addOrRemove: AddOrRemove, + private fun StartedNode<*>.registration(addOrRemove: AddOrRemove, serial: Long? = null): CordaFuture { val distinctSerial = if (serial == null) { ++lastSerial @@ -225,7 +227,7 @@ abstract class AbstractNetworkMapServiceTest return response } - private fun MockNode.subscribe(): Queue { + private fun StartedNode<*>.subscribe(): Queue { val request = SubscribeRequest(true, network.myAddress) val updates = LinkedBlockingQueue() services.networkService.addMessageHandler(PUSH_TOPIC) { message, _ -> @@ -237,37 +239,37 @@ abstract class AbstractNetworkMapServiceTest return updates } - private fun MockNode.unsubscribe() { + private fun StartedNode<*>.unsubscribe() { val request = SubscribeRequest(false, network.myAddress) val response = services.networkService.sendRequest(SUBSCRIPTION_TOPIC, request, mapServiceNode.network.myAddress) mockNet.runNetwork() assertThat(response.getOrThrow().confirmed).isTrue() } - private fun MockNode.ackUpdate(mapVersion: Int) { + private fun StartedNode<*>.ackUpdate(mapVersion: Int) { val request = UpdateAcknowledge(mapVersion, services.networkService.myAddress) services.networkService.send(PUSH_ACK_TOPIC, MessagingService.DEFAULT_SESSION_ID, request, mapServiceNode.network.myAddress) mockNet.runNetwork() } - private fun addNewNodeToNetworkMap(legalName: CordaX500Name): MockNode { + private fun addNewNodeToNetworkMap(legalName: CordaX500Name): StartedNode { val node = mockNet.createNode(mapServiceNode.network.myAddress, legalName = legalName) mockNet.runNetwork() lastSerial = System.currentTimeMillis() return node } - private fun newNodeSeparateFromNetworkMap(legalName: CordaX500Name): MockNode { + private fun newNodeSeparateFromNetworkMap(legalName: CordaX500Name): StartedNode { return mockNet.createNode(legalName = legalName, nodeFactory = NoNMSNodeFactory) } sealed class Changed { data class Added(val node: NodeInfo) : Changed() { - constructor(node: MockNode) : this(node.info) + constructor(node: StartedNode<*>) : this(node.info) } data class Removed(val node: NodeInfo) : Changed() { - constructor(node: MockNode) : this(node.info) + constructor(node: StartedNode<*>) : this(node.info) } } @@ -280,7 +282,7 @@ abstract class AbstractNetworkMapServiceTest overrideServices: Map?, entropyRoot: BigInteger): MockNode { return object : MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { - override fun makeNetworkMapService() {} + override fun makeNetworkMapService() = NullNetworkMapService } } } diff --git a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt index 48b0c57b4d..64fb778651 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -9,6 +9,7 @@ import net.corda.core.identity.Party import net.corda.core.node.NodeInfo import net.corda.core.utilities.* import net.corda.node.internal.Node +import net.corda.node.internal.StartedNode import net.corda.testing.ALICE import net.corda.testing.BOB import net.corda.testing.CHARLIE @@ -28,11 +29,11 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { @Before fun start() { val nodes = startNodesWithPort(partiesList) - nodes.forEach { it.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting. + nodes.forEach { it.internals.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting. nodes.forEach { infos.add(it.info) addressesMap[it.info.legalIdentity.name] = it.info.addresses[0] - it.stop() // We want them to communicate with NetworkMapService to save data to cache. + it.dispose() // We want them to communicate with NetworkMapService to save data to cache. } } @@ -63,7 +64,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0] val partyNodes = alice.services.networkMapCache.partyNodes assert(NetworkMapService.type !in alice.info.advertisedServices.map { it.info.type }) - assertEquals(null, alice.inNodeNetworkMapService) + assertEquals(NullNetworkMapService, alice.inNodeNetworkMapService) assertEquals(infos.size, partyNodes.size) assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet()) } @@ -72,7 +73,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() { val parties = partiesList.subList(1, partiesList.size) val nodes = startNodesWithPort(parties, noNetworkMap = true) - assert(nodes.all { it.inNodeNetworkMapService == null }) + assert(nodes.all { it.inNodeNetworkMapService == NullNetworkMapService }) assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } }) nodes.forEach { val partyNodes = it.services.networkMapCache.partyNodes @@ -86,7 +87,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() { val parties = partiesList.subList(1, partiesList.size) val nodes = startNodesWithPort(parties, noNetworkMap = false) - assert(nodes.all { it.inNodeNetworkMapService == null }) + assert(nodes.all { it.inNodeNetworkMapService == NullNetworkMapService }) assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } }) nodes.forEach { val partyNodes = it.services.networkMapCache.partyNodes @@ -123,13 +124,13 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { } // Start Network Map and see that charlie node appears in caches. val nms = startNodesWithPort(listOf(DUMMY_NOTARY), noNetworkMap = false)[0] - nms.startupComplete.get() - assert(nms.inNodeNetworkMapService != null) + nms.internals.startupComplete.get() + assert(nms.inNodeNetworkMapService != NullNetworkMapService) assert(infos.any {it.legalIdentity == nms.info.legalIdentity}) otherNodes.forEach { assert(nms.info.legalIdentity in it.services.networkMapCache.partyNodes.map { it.legalIdentity }) } - charlie.nodeReadyFuture.get() // Finish registration. + charlie.internals.nodeReadyFuture.get() // Finish registration. checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS. val cacheA = otherNodes[0].services.networkMapCache.partyNodes val cacheB = otherNodes[1].services.networkMapCache.partyNodes @@ -142,7 +143,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { // HELPERS // Helper function to restart nodes with the same host and port. - private fun startNodesWithPort(nodesToStart: List, noNetworkMap: Boolean = false): List { + private fun startNodesWithPort(nodesToStart: List, noNetworkMap: Boolean = false): List> { return nodesToStart.map { party -> val configOverrides = addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap() if (party == DUMMY_NOTARY) { @@ -158,10 +159,10 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { } // Check that nodes are functional, communicate each with each. - private fun checkConnectivity(nodes: List) { + private fun checkConnectivity(nodes: List>) { nodes.forEach { node1 -> nodes.forEach { node2 -> - node2.registerInitiatedFlow(SendBackFlow::class.java) + node2.internals.registerInitiatedFlow(SendBackFlow::class.java) val resultFuture = node1.services.startFlow(SendFlow(node2.info.legalIdentity)).resultFuture assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!") } diff --git a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapServiceTest.kt index 27dac2cd35..340e5a7d09 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapServiceTest.kt @@ -35,9 +35,7 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest?, entropyRoot: BigInteger): MockNode { return object : MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { - override fun makeNetworkMapService() { - inNodeNetworkMapService = SwizzleNetworkMapService(services) - } + override fun makeNetworkMapService() = SwizzleNetworkMapService(services) } } } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt index d9b46706bd..a69356745c 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt @@ -13,11 +13,11 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.finance.USD import net.corda.finance.contracts.asset.Cash +import net.corda.node.internal.StartedNode import net.corda.node.services.NotifyTransactionHandler import net.corda.testing.DUMMY_NOTARY import net.corda.testing.MEGA_CORP import net.corda.testing.node.MockNetwork -import net.corda.testing.node.MockNetwork.MockNode import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before @@ -95,8 +95,8 @@ class DataVendingServiceTests { } } - private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) { - walletServiceNode.registerInitiatedFlow(InitiateNotifyTxFlow::class.java) + private fun StartedNode<*>.sendNotifyTx(tx: SignedTransaction, walletServiceNode: StartedNode<*>) { + walletServiceNode.internals.registerInitiatedFlow(InitiateNotifyTxFlow::class.java) services.startFlow(NotifyTxFlow(walletServiceNode.info.legalIdentity, tx)) mockNet.runNetwork() } diff --git a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt index e83163cb0b..5e9698b68b 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt @@ -27,7 +27,7 @@ class NodeSchemaServiceTest { val mockNode = mockNet.createNode() mockNet.runNetwork() - mockNode.registerCustomSchemas(setOf(DummyLinearStateSchemaV1)) + mockNode.internals.registerCustomSchemas(setOf(DummyLinearStateSchemaV1)) val schemaService = mockNode.services.schemaService assertTrue(schemaService.schemaOptions.containsKey(DummyLinearStateSchemaV1)) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index a3c365a801..03bef9cb85 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -31,6 +31,7 @@ import net.corda.finance.DOLLARS import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow import net.corda.node.internal.InitiatedFlowFactory +import net.corda.node.internal.StartedNode import net.corda.node.services.network.NetworkMapService import net.corda.node.services.persistence.checkpoints import net.corda.node.services.transactions.ValidatingNotaryService @@ -42,6 +43,7 @@ import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork.MockNode +import net.corda.testing.node.pumpReceive import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType @@ -66,10 +68,10 @@ class FlowFrameworkTests { private val mockNet = MockNetwork(servicePeerAllocationStrategy = RoundRobin()) private val receivedSessionMessages = ArrayList() - private lateinit var node1: MockNode - private lateinit var node2: MockNode - private lateinit var notary1: MockNode - private lateinit var notary2: MockNode + private lateinit var node1: StartedNode + private lateinit var node2: StartedNode + private lateinit var notary1: StartedNode + private lateinit var notary2: StartedNode @Before fun start() { @@ -77,7 +79,7 @@ class FlowFrameworkTests { node2 = mockNet.createNode(networkMapAddress = node1.network.myAddress) mockNet.runNetwork() - node1.ensureRegistered() + node1.internals.ensureRegistered() // We intentionally create our own notary and ignore the one provided by the network val notaryKeyPair = generateKeyPair() @@ -111,7 +113,7 @@ class FlowFrameworkTests { @Test fun `newly added flow is preserved on restart`() { node1.services.startFlow(NoOpFlow(nonTerminating = true)) - node1.acceptableLiveFiberCountOnStop = 1 + node1.internals.acceptableLiveFiberCountOnStop = 1 val restoredFlow = node1.restartAndGetRestoredFlow() assertThat(restoredFlow.flowStarted).isTrue() } @@ -149,9 +151,9 @@ class FlowFrameworkTests { // We push through just enough messages to get only the payload sent node2.pumpReceive() - node2.disableDBCloseOnStop() - node2.acceptableLiveFiberCountOnStop = 1 - node2.stop() + node2.internals.disableDBCloseOnStop() + node2.internals.acceptableLiveFiberCountOnStop = 1 + node2.dispose() mockNet.runNetwork() val restoredFlow = node2.restartAndGetRestoredFlow(node1) assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") @@ -173,22 +175,22 @@ class FlowFrameworkTests { val flow = NoOpFlow() node3.services.startFlow(flow) assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet - node3.disableDBCloseOnStop() + node3.internals.disableDBCloseOnStop() node3.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network. - node3.stop() + node3.dispose() - node3 = mockNet.createNode(node1.network.myAddress, node3.id) + node3 = mockNet.createNode(node1.network.myAddress, node3.internals.id) val restoredFlow = node3.getSingleFlow().first assertEquals(false, restoredFlow.flowStarted) // Not started yet as no network activity has been allowed yet mockNet.runNetwork() // Allow network map messages to flow node3.smm.executor.flush() assertEquals(true, restoredFlow.flowStarted) // Now we should have run the flow and hopefully cleared the init checkpoint - node3.disableDBCloseOnStop() + node3.internals.disableDBCloseOnStop() node3.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network. - node3.stop() + node3.dispose() // Now it is completed the flow should leave no Checkpoint. - node3 = mockNet.createNode(node1.network.myAddress, node3.id) + node3 = mockNet.createNode(node1.network.myAddress, node3.internals.id) mockNet.runNetwork() // Allow network map messages to flow node3.smm.executor.flush() assertTrue(node3.smm.findStateMachines(NoOpFlow::class.java).isEmpty()) @@ -200,8 +202,8 @@ class FlowFrameworkTests { node2.services.startFlow(ReceiveFlow(node1.info.legalIdentity).nonTerminating()) // Prepare checkpointed receive flow // Make sure the add() has finished initial processing. node2.smm.executor.flush() - node2.disableDBCloseOnStop() - node2.stop() // kill receiver + node2.internals.disableDBCloseOnStop() + node2.dispose() // kill receiver val restoredFlow = node2.restartAndGetRestoredFlow(node1) assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") } @@ -225,15 +227,15 @@ class FlowFrameworkTests { } // Make sure the add() has finished initial processing. node2.smm.executor.flush() - node2.disableDBCloseOnStop() + node2.internals.disableDBCloseOnStop() // Restart node and thus reload the checkpoint and resend the message with same UUID - node2.stop() + node2.dispose() node2.database.transaction { assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint node2.services.networkMapCache.clearNetworkMapCache() } - val node2b = mockNet.createNode(node1.network.myAddress, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray()) - node2.manuallyCloseDB() + val node2b = mockNet.createNode(node1.network.myAddress, node2.internals.id, advertisedServices = *node2.internals.advertisedServices.toTypedArray()) + node2.internals.manuallyCloseDB() val (firstAgain, fut1) = node2b.getSingleFlow() // Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync. mockNet.runNetwork() @@ -285,8 +287,8 @@ class FlowFrameworkTests { //There's no session end from the other flows as they're manually suspended ) - node2.acceptableLiveFiberCountOnStop = 1 - node3.acceptableLiveFiberCountOnStop = 1 + node2.internals.acceptableLiveFiberCountOnStop = 1 + node3.internals.acceptableLiveFiberCountOnStop = 1 } @Test @@ -299,7 +301,7 @@ class FlowFrameworkTests { node3.registerFlowFactory(ReceiveFlow::class) { SendFlow(node3Payload, it) } val multiReceiveFlow = ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity).nonTerminating() node1.services.startFlow(multiReceiveFlow) - node1.acceptableLiveFiberCountOnStop = 1 + node1.internals.acceptableLiveFiberCountOnStop = 1 mockNet.runNetwork() assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(node2Payload) assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(node3Payload) @@ -360,32 +362,32 @@ class FlowFrameworkTests { // First Pay expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) { it.message as SessionInit - assertEquals(node1.id, it.from) + assertEquals(node1.internals.id, it.from) assertEquals(notary1Address, it.to) }, expect(match = { it.message is SessionConfirm }) { it.message as SessionConfirm - assertEquals(notary1.id, it.from) + assertEquals(notary1.internals.id, it.from) }, // Second pay expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) { it.message as SessionInit - assertEquals(node1.id, it.from) + assertEquals(node1.internals.id, it.from) assertEquals(notary1Address, it.to) }, expect(match = { it.message is SessionConfirm }) { it.message as SessionConfirm - assertEquals(notary2.id, it.from) + assertEquals(notary2.internals.id, it.from) }, // Third pay expect(match = { it.message is SessionInit && it.message.initiatingFlowClass == NotaryFlow.Client::class.java.name }) { it.message as SessionInit - assertEquals(node1.id, it.from) + assertEquals(node1.internals.id, it.from) assertEquals(notary1Address, it.to) }, expect(match = { it.message is SessionConfirm }) { it.message as SessionConfirm - assertEquals(it.from, notary1.id) + assertEquals(it.from, notary1.internals.id) } ) } @@ -740,26 +742,26 @@ class FlowFrameworkTests { //////////////////////////////////////////////////////////////////////////////////////////////////////////// //region Helpers - private inline fun > MockNode.restartAndGetRestoredFlow(networkMapNode: MockNode? = null): P { + private inline fun > StartedNode.restartAndGetRestoredFlow(networkMapNode: StartedNode<*>? = null) = internals.run { disableDBCloseOnStop() // Handover DB to new node copy stop() val newNode = mockNet.createNode(networkMapNode?.network?.myAddress, id, advertisedServices = *advertisedServices.toTypedArray()) - newNode.acceptableLiveFiberCountOnStop = 1 + newNode.internals.acceptableLiveFiberCountOnStop = 1 manuallyCloseDB() mockNet.runNetwork() // allow NetworkMapService messages to stabilise and thus start the state machine - return newNode.getSingleFlow

().first + newNode.getSingleFlow

().first } - private inline fun > MockNode.getSingleFlow(): Pair> { + private inline fun > StartedNode<*>.getSingleFlow(): Pair> { return smm.findStateMachines(P::class.java).single() } - private inline fun > MockNode.registerFlowFactory( + private inline fun > StartedNode<*>.registerFlowFactory( initiatingFlowClass: KClass>, initiatedFlowVersion: Int = 1, noinline flowFactory: (Party) -> P): CordaFuture

{ - val observable = internalRegisterFlowFactory( + val observable = internals.internalRegisterFlowFactory( initiatingFlowClass.java, InitiatedFlowFactory.CorDapp(initiatedFlowVersion, "", flowFactory), P::class.java, @@ -775,7 +777,7 @@ class FlowFrameworkTests { private val normalEnd = NormalSessionEnd(0) private fun erroredEnd(errorResponse: FlowException? = null) = ErrorSessionEnd(0, errorResponse) - private fun MockNode.sendSessionMessage(message: SessionMessage, destination: MockNode) { + private fun StartedNode<*>.sendSessionMessage(message: SessionMessage, destination: StartedNode<*>) { services.networkService.apply { val address = getAddressOfParty(PartyInfo.Node(destination.info)) send(createMessage(StateMachineManager.sessionTopic, message.serialize().bytes), address) @@ -786,8 +788,8 @@ class FlowFrameworkTests { assertThat(receivedSessionMessages).containsExactly(*expected) } - private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer): List { - val actualForNode = receivedSessionMessages.filter { it.from == node.id || it.to == node.network.myAddress } + private fun assertSessionTransfers(node: StartedNode, vararg expected: SessionTransfer): List { + val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress } assertThat(actualForNode).containsExactly(*expected) return actualForNode } @@ -818,8 +820,8 @@ class FlowFrameworkTests { else -> message } - private infix fun MockNode.sent(message: SessionMessage): Pair = Pair(id, message) - private infix fun Pair.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress) + private infix fun StartedNode.sent(message: SessionMessage): Pair = Pair(internals.id, message) + private infix fun Pair.to(node: StartedNode<*>): SessionTransfer = SessionTransfer(first, second, node.network.myAddress) private val FlowLogic<*>.progressSteps: CordaFuture>> get() { return progressTracker!!.changes diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt index 45a8bb66dd..c288392bb8 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt @@ -12,7 +12,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds -import net.corda.node.internal.AbstractNode +import net.corda.node.internal.StartedNode import net.corda.node.services.network.NetworkMapService import net.corda.testing.DUMMY_NOTARY import net.corda.testing.contracts.DummyContract @@ -29,8 +29,8 @@ import kotlin.test.assertFailsWith class NotaryServiceTests { lateinit var mockNet: MockNetwork - lateinit var notaryNode: MockNetwork.MockNode - lateinit var clientNode: MockNetwork.MockNode + lateinit var notaryNode: StartedNode + lateinit var clientNode: StartedNode @Before fun setup() { @@ -40,7 +40,7 @@ class NotaryServiceTests { advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type))) clientNode = mockNet.createNode(notaryNode.network.myAddress) mockNet.runNetwork() // Clear network map registration messages - notaryNode.ensureRegistered() + notaryNode.internals.ensureRegistered() } @After @@ -153,7 +153,7 @@ class NotaryServiceTests { return future } - fun issueState(node: AbstractNode): StateAndRef<*> { + fun issueState(node: StartedNode<*>): StateAndRef<*> { val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0)) val signedByNode = node.services.signInitialTransaction(tx) val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey) diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt index f65a9f2ef9..1ff0eccc91 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt @@ -12,7 +12,7 @@ import net.corda.core.node.services.ServiceInfo import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.getOrThrow import net.corda.core.transactions.TransactionBuilder -import net.corda.node.internal.AbstractNode +import net.corda.node.internal.StartedNode import net.corda.node.services.issueInvalidState import net.corda.node.services.network.NetworkMapService import net.corda.testing.DUMMY_NOTARY @@ -30,8 +30,8 @@ import kotlin.test.assertFailsWith class ValidatingNotaryServiceTests { lateinit var mockNet: MockNetwork - lateinit var notaryNode: MockNetwork.MockNode - lateinit var clientNode: MockNetwork.MockNode + lateinit var notaryNode: StartedNode + lateinit var clientNode: StartedNode @Before fun setup() { @@ -42,7 +42,7 @@ class ValidatingNotaryServiceTests { ) clientNode = mockNet.createNode(notaryNode.network.myAddress) mockNet.runNetwork() // Clear network map registration messages - notaryNode.ensureRegistered() + notaryNode.internals.ensureRegistered() } @After @@ -96,7 +96,7 @@ class ValidatingNotaryServiceTests { return future } - fun issueState(node: AbstractNode): StateAndRef<*> { + fun issueState(node: StartedNode<*>): StateAndRef<*> { val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0)) val signedByNode = node.services.signInitialTransaction(tx) val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey) diff --git a/samples/irs-demo/src/test/kotlin/net/corda/irs/api/NodeInterestRatesTest.kt b/samples/irs-demo/src/test/kotlin/net/corda/irs/api/NodeInterestRatesTest.kt index 1b97c4e325..22c2b92e10 100644 --- a/samples/irs-demo/src/test/kotlin/net/corda/irs/api/NodeInterestRatesTest.kt +++ b/samples/irs-demo/src/test/kotlin/net/corda/irs/api/NodeInterestRatesTest.kt @@ -201,10 +201,10 @@ class NodeInterestRatesTest : TestDependencyInjectionBase() { val mockNet = MockNetwork(initialiseSerialization = false) val n1 = mockNet.createNotaryNode() val oracleNode = mockNet.createNode(n1.network.myAddress).apply { - registerInitiatedFlow(NodeInterestRates.FixQueryHandler::class.java) - registerInitiatedFlow(NodeInterestRates.FixSignHandler::class.java) + internals.registerInitiatedFlow(NodeInterestRates.FixQueryHandler::class.java) + internals.registerInitiatedFlow(NodeInterestRates.FixSignHandler::class.java) database.transaction { - installCordaService(NodeInterestRates.Oracle::class.java).knownFixes = TEST_DATA + internals.installCordaService(NodeInterestRates.Oracle::class.java).knownFixes = TEST_DATA } } val tx = makePartialTX() diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt index 6ea3085e7b..3f910dbbe2 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt @@ -224,7 +224,7 @@ class NetworkMapVisualiser : Application() { // Flow done; schedule it for removal in a few seconds. We batch them up to make nicer // animations. updateProgressTrackerWidget(change) - println("Flow done for ${node.info.legalIdentity.name}") + println("Flow done for ${node.started!!.info.legalIdentity.name}") viewModel.doneTrackers += tracker } else { // Subflow is done; ignore it. @@ -232,7 +232,7 @@ class NetworkMapVisualiser : Application() { } else if (!viewModel.trackerBoxes.containsKey(tracker)) { // New flow started up; add. val extraLabel = viewModel.simulation.extraNodeLabels[node] - val label = if (extraLabel != null) "${node.info.legalIdentity.name.organisation}: $extraLabel" else node.info.legalIdentity.name.organisation + val label = node.started!!.info.legalIdentity.name.organisation.let { if (extraLabel != null) "$it: $extraLabel" else it } val widget = view.buildProgressTrackerWidget(label, tracker.topLevelTracker) println("Added: $tracker, $widget") viewModel.trackerBoxes[tracker] = widget diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt index 07ee598a81..8c08df8531 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt @@ -86,7 +86,7 @@ class VisualiserViewModel { try { return node.place.coordinate.project(view.mapImage.fitWidth, view.mapImage.fitHeight, 64.3209, 29.8406, -23.2031, 33.0469) } catch(e: Exception) { - throw Exception("Cannot project ${node.info.legalIdentity}", e) + throw Exception("Cannot project ${node.started!!.info.legalIdentity}", e) } } diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt index c56c684096..19f5bd869c 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt @@ -42,7 +42,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>()) override fun startMainSimulation(): CompletableFuture { - om = JacksonSupport.createInMemoryMapper(InMemoryIdentityService((banks + regulators + networkMap + ratesOracle).map { it.info.legalIdentityAndCert }, trustRoot = DUMMY_CA.certificate)) + om = JacksonSupport.createInMemoryMapper(InMemoryIdentityService((banks + regulators + networkMap.internals + ratesOracle).map { it.started!!.info.legalIdentityAndCert }, trustRoot = DUMMY_CA.certificate)) registerFinanceJSONMappers(om) return startIRSDealBetween(0, 1).thenCompose { @@ -89,8 +89,8 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten private fun doNextFixing(i: Int, j: Int): CompletableFuture? { println("Doing a fixing between $i and $j") - val node1: SimulatedNode = banks[i] - val node2: SimulatedNode = banks[j] + val node1 = banks[i].started!! + val node2 = banks[j].started!! val swaps = node1.database.transaction { @@ -100,14 +100,14 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten // Do we have any more days left in this deal's lifetime? If not, return. val nextFixingDate = theDealRef.state.data.calculation.nextFixingDate() ?: return null - extraNodeLabels[node1] = "Fixing event on $nextFixingDate" - extraNodeLabels[node2] = "Fixing event on $nextFixingDate" + extraNodeLabels[node1.internals] = "Fixing event on $nextFixingDate" + extraNodeLabels[node2.internals] = "Fixing event on $nextFixingDate" // Complete the future when the state has been consumed on both nodes val futA = node1.services.vaultService.whenConsumed(theDealRef.ref) val futB = node2.services.vaultService.whenConsumed(theDealRef.ref) - showConsensusFor(listOf(node1, node2, regulators[0])) + showConsensusFor(listOf(node1.internals, node2.internals, regulators[0])) // For some reason the first fix is always before the effective date. if (nextFixingDate > currentDateAndTime.toLocalDate()) @@ -117,11 +117,11 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten } private fun startIRSDealBetween(i: Int, j: Int): CompletableFuture { - val node1: SimulatedNode = banks[i] - val node2: SimulatedNode = banks[j] + val node1 = banks[i].started!! + val node2 = banks[j].started!! - extraNodeLabels[node1] = "Setting up deal" - extraNodeLabels[node2] = "Setting up deal" + extraNodeLabels[node1.internals] = "Setting up deal" + extraNodeLabels[node2.internals] = "Setting up deal" // We load the IRS afresh each time because the leg parts of the structure aren't data classes so they don't // have the convenient copy() method that'd let us make small adjustments. Instead they're partly mutable. @@ -134,8 +134,8 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten irs.fixedLeg.fixedRatePayer = node1.info.legalIdentity irs.floatingLeg.floatingRatePayer = node2.info.legalIdentity - node1.registerInitiatedFlow(FixingFlow.Fixer::class.java) - node2.registerInitiatedFlow(FixingFlow.Fixer::class.java) + node1.internals.registerInitiatedFlow(FixingFlow.Fixer::class.java) + node2.internals.registerInitiatedFlow(FixingFlow.Fixer::class.java) @InitiatingFlow class StartDealFlow(val otherParty: Party, @@ -147,7 +147,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten @InitiatedBy(StartDealFlow::class) class AcceptDealFlow(otherParty: Party) : Acceptor(otherParty) - val acceptDealFlows: Observable = node2.registerInitiatedFlow(AcceptDealFlow::class.java) + val acceptDealFlows: Observable = node2.internals.registerInitiatedFlow(AcceptDealFlow::class.java) @Suppress("UNCHECKED_CAST") val acceptorTxFuture = acceptDealFlows.toFuture().toCompletableFuture().thenCompose { @@ -155,7 +155,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten } showProgressFor(listOf(node1, node2)) - showConsensusFor(listOf(node1, node2, regulators[0])) + showConsensusFor(listOf(node1.internals, node2.internals, regulators[0])) val instigator = StartDealFlow( node2.info.legalIdentity, diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt index 1316064947..42cb3f3064 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt @@ -2,6 +2,7 @@ package net.corda.netmap.simulation import net.corda.core.flows.FlowLogic import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.CityDatabase import net.corda.core.node.WorldMapLocation @@ -9,6 +10,7 @@ import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType import net.corda.core.utilities.ProgressTracker import net.corda.irs.api.NodeInterestRates +import net.corda.node.internal.StartedNode import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.network.NetworkMapService import net.corda.node.services.statemachine.StateMachineManager @@ -55,6 +57,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, advertisedServices: Set, id: Int, overrideServices: Map?, entropyRoot: BigInteger) : MockNetwork.MockNode(config, mockNet, networkMapAddress, advertisedServices, id, overrideServices, entropyRoot) { + override val started: StartedNode? get() = uncheckedCast(super.started) override fun findMyLocation(): WorldMapLocation? { return configuration.myLegalName.locality.let { CityDatabase[it] } } @@ -78,7 +81,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, fun createAll(): List { return bankLocations.mapIndexed { i, _ -> // Use deterministic seeds so the simulation is stable. Needed so that party owning keys are stable. - mockNet.createNode(networkMap.network.myAddress, nodeFactory = this, start = false, entropyRoot = BigInteger.valueOf(i.toLong())) + mockNet.createUnstartedNode(networkMap.network.myAddress, nodeFactory = this, entropyRoot = BigInteger.valueOf(i.toLong())) } } } @@ -120,8 +123,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, baseDirectory = config.baseDirectory, myLegalName = RATES_SERVICE_NAME) return object : SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { - override fun start() { - super.start() + override fun start() = super.start().apply { registerInitiatedFlow(NodeInterestRates.FixQueryHandler::class.java) registerInitiatedFlow(NodeInterestRates.FixSignHandler::class.java) javaClass.classLoader.getResourceAsStream("net/corda/irs/simulation/example.rates.txt").use { @@ -153,11 +155,11 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, // This one must come first. val networkMap = mockNet.createNode(nodeFactory = NetworkMapNodeFactory, advertisedServices = ServiceInfo(NetworkMapService.type)) val notary = mockNet.createNode(networkMap.network.myAddress, nodeFactory = NotaryNodeFactory, advertisedServices = ServiceInfo(SimpleNotaryService.type)) - val regulators = listOf(mockNet.createNode(networkMap.network.myAddress, start = false, nodeFactory = RegulatorFactory)) - val ratesOracle = mockNet.createNode(networkMap.network.myAddress, start = false, nodeFactory = RatesOracleFactory) + val regulators = listOf(mockNet.createUnstartedNode(networkMap.network.myAddress, nodeFactory = RegulatorFactory)) + val ratesOracle = mockNet.createUnstartedNode(networkMap.network.myAddress, nodeFactory = RatesOracleFactory) // All nodes must be in one of these two lists for the purposes of the visualiser tool. - val serviceProviders: List = listOf(notary, ratesOracle, networkMap) + val serviceProviders: List = listOf(notary.internals, ratesOracle, networkMap.internals) val banks: List = bankFactory.createAll() val clocks = (serviceProviders + regulators + banks).map { it.platformClock as TestClock } @@ -225,10 +227,10 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, return null } - protected fun showProgressFor(nodes: List) { + protected fun showProgressFor(nodes: List>) { nodes.forEach { node -> node.smm.changes.filter { it is StateMachineManager.Change.Add }.subscribe { - linkFlowProgress(node, it.logic) + linkFlowProgress(node.internals, it.logic) } } } @@ -244,7 +246,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, protected fun showConsensusFor(nodes: List) { val node = nodes.first() - node.smm.changes.filter { it is StateMachineManager.Change.Add }.first().subscribe { + node.started!!.smm.changes.filter { it is StateMachineManager.Change.Add }.first().subscribe { linkConsensus(nodes, it.logic) } } diff --git a/samples/network-visualiser/src/test/kotlin/net/corda/netmap/simulation/IRSSimulationTest.kt b/samples/network-visualiser/src/test/kotlin/net/corda/netmap/simulation/IRSSimulationTest.kt index 3cab8ae88f..bb843910e1 100644 --- a/samples/network-visualiser/src/test/kotlin/net/corda/netmap/simulation/IRSSimulationTest.kt +++ b/samples/network-visualiser/src/test/kotlin/net/corda/netmap/simulation/IRSSimulationTest.kt @@ -8,7 +8,7 @@ class IRSSimulationTest { // TODO: These tests should be a lot more complete. @Test fun `runs to completion`() { - LogHelper.setLevel("+messages") + LogHelper.setLevel("+messages") // FIXME: Don't manipulate static state in tests. val sim = IRSSimulation(false, false, null) val future = sim.start() while (!future.isDone) sim.iterate() diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index cac912faec..b6a30a17bf 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -39,16 +39,16 @@ class TraderDemoTest : NodeBasedTest() { val bankNodeFuture = startNode(BOC.name, rpcUsers = listOf(bankUser)) val (nodeA, nodeB, bankNode) = listOf(nodeAFuture, nodeBFuture, bankNodeFuture, notaryFuture).map { it.getOrThrow() } - nodeA.registerInitiatedFlow(BuyerFlow::class.java) - nodeA.registerCustomSchemas(setOf(CashSchemaV1)) - nodeB.registerCustomSchemas(setOf(CashSchemaV1, CommercialPaperSchemaV1)) + nodeA.internals.registerInitiatedFlow(BuyerFlow::class.java) + nodeA.internals.registerCustomSchemas(setOf(CashSchemaV1)) + nodeB.internals.registerCustomSchemas(setOf(CashSchemaV1, CommercialPaperSchemaV1)) val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map { - val client = CordaRPCClient(it.configuration.rpcAddress!!, initialiseSerialization = false) + val client = CordaRPCClient(it.internals.configuration.rpcAddress!!, initialiseSerialization = false) client.start(demoUser.username, demoUser.password).proxy } val nodeBankRpc = let { - val client = CordaRPCClient(bankNode.configuration.rpcAddress!!, initialiseSerialization = false) + val client = CordaRPCClient(bankNode.internals.configuration.rpcAddress!!, initialiseSerialization = false) client.start(bankUser.username, bankUser.password).proxy } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt index 5c410218cd..ec5d31d3f5 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -24,6 +24,7 @@ import net.corda.core.node.services.ServiceType import net.corda.core.utilities.* import net.corda.node.internal.Node import net.corda.node.internal.NodeStartup +import net.corda.node.internal.StartedNode import net.corda.node.services.config.* import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.RaftValidatingNotaryService @@ -204,11 +205,11 @@ sealed class NodeHandle { override val rpc: CordaRPCOps, override val configuration: FullNodeConfiguration, override val webAddress: NetworkHostAndPort, - val node: Node, + val node: StartedNode, val nodeThread: Thread ) : NodeHandle() { override fun stop(): CordaFuture { - node.stop() + node.dispose() with(nodeThread) { interrupt() join() @@ -823,7 +824,7 @@ class DriverDSL( shutdownManager.registerShutdown( nodeAndThreadFuture.map { (node, thread) -> { - node.stop() + node.dispose() thread.interrupt() } } @@ -880,16 +881,15 @@ class DriverDSL( executorService: ScheduledExecutorService, nodeConf: FullNodeConfiguration, config: Config - ): CordaFuture> { + ): CordaFuture, Thread>> { return executorService.fork { log.info("Starting in-process Node ${nodeConf.myLegalName.organisation}") // Write node.conf writeConfig(nodeConf.baseDirectory, "node.conf", config) // TODO pass the version in? - val node = Node(nodeConf, nodeConf.calculateServices(), MOCK_VERSION_INFO, initialiseSerialization = false) - node.start() + val node = Node(nodeConf, nodeConf.calculateServices(), MOCK_VERSION_INFO, initialiseSerialization = false).start() val nodeThread = thread(name = nodeConf.myLegalName.organisation) { - node.run() + node.internals.run() } node to nodeThread }.flatMap { nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt index 4883aaba4b..cdfe1ea007 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -10,6 +10,7 @@ import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.createDirectories import net.corda.core.internal.createDirectory +import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient @@ -19,6 +20,7 @@ import net.corda.core.node.WorldMapLocation import net.corda.core.node.services.* import net.corda.core.utilities.* import net.corda.node.internal.AbstractNode +import net.corda.node.internal.StartedNode import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.E2ETestKeyManagementService @@ -40,6 +42,10 @@ import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +fun StartedNode.pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? { + return (network as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block) +} + /** * A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing. * Components that do IO are either swapped out for mocks, or pointed to a [Jimfs] in memory filesystem or an in @@ -143,6 +149,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, mockNet.sharedUserCount.incrementAndGet() mockNet.sharedServerThread } + override val started: StartedNode? get() = uncheckedCast(super.started) + override fun start(): StartedNode = uncheckedCast(super.start()) // We only need to override the messaging service here, as currently everything that hits disk does so // through the java.nio API which we are already mocking via Jimfs. @@ -165,7 +173,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, val caCertificates: Array = listOf(legalIdentity.certificate.cert, clientCa?.certificate?.cert) .filterNotNull() .toTypedArray() - val identityService = PersistentIdentityService(setOf(info.legalIdentityAndCert), + val identityService = PersistentIdentityService(setOf(legalIdentity), trustRoot = trustRoot, caCertificates = *caCertificates) services.networkMapCache.partyNodes.forEach { identityService.verifyAndRegisterIdentity(it.legalIdentityAndCert) } services.networkMapCache.changed.subscribe { mapChange -> @@ -186,8 +194,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, // Nothing to do } - override fun makeNetworkMapService() { - inNodeNetworkMapService = InMemoryNetworkMapService(services, platformVersion) + override fun makeNetworkMapService(): NetworkMapService { + return InMemoryNetworkMapService(services, platformVersion) } override fun makeServiceEntries(): List { @@ -233,10 +241,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, // It is used from the network visualiser tool. @Suppress("unused") val place: WorldMapLocation get() = findMyLocation()!! - fun pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? { - return (network as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block) - } - fun disableDBCloseOnStop() { runOnStop.remove(dbCloser) } @@ -256,13 +260,13 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, return BFTNonValidatingNotaryService(services, object : BFTSMaRt.Cluster { override fun waitUntilAllReplicasHaveInitialized() { val clusterNodes = mockNet.nodes.filter { - services.notaryIdentityKey in it.info.serviceIdentities(BFTNonValidatingNotaryService.type).map { it.owningKey } + services.notaryIdentityKey in it.started!!.info.serviceIdentities(BFTNonValidatingNotaryService.type).map { it.owningKey } } if (clusterNodes.size != configuration.notaryClusterAddresses.size) { throw IllegalStateException("Unable to enumerate all nodes in BFT cluster.") } clusterNodes.forEach { - val notaryService = it.smm.findServices { it is BFTNonValidatingNotaryService }.single() as BFTNonValidatingNotaryService + val notaryService = it.started!!.smm.findServices { it is BFTNonValidatingNotaryService }.single() as BFTNonValidatingNotaryService notaryService.waitUntilReplicaHasInitialized() } } @@ -279,6 +283,22 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, } } + fun createUnstartedNode(networkMapAddress: SingleMessageRecipient? = null, forcedID: Int? = null, + legalName: CordaX500Name? = null, overrideServices: Map? = null, + entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()), + vararg advertisedServices: ServiceInfo, + configOverrides: (NodeConfiguration) -> Any? = {}): MockNode { + return createUnstartedNode(networkMapAddress, forcedID, defaultFactory, legalName, overrideServices, entropyRoot, *advertisedServices, configOverrides = configOverrides) + } + + fun createUnstartedNode(networkMapAddress: SingleMessageRecipient? = null, forcedID: Int? = null, nodeFactory: Factory, + legalName: CordaX500Name? = null, overrideServices: Map? = null, + entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()), + vararg advertisedServices: ServiceInfo, + configOverrides: (NodeConfiguration) -> Any? = {}): N { + return createNodeImpl(networkMapAddress, forcedID, nodeFactory, false, legalName, overrideServices, entropyRoot, advertisedServices, configOverrides) + } + /** * Returns a node, optionally created by the passed factory method. * @param overrideServices a set of service entries to use in place of the node's default service entries, @@ -288,19 +308,27 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, * @param configOverrides add/override behaviour of the [NodeConfiguration] mock object. */ fun createNode(networkMapAddress: SingleMessageRecipient? = null, forcedID: Int? = null, - start: Boolean = true, legalName: CordaX500Name? = null, overrideServices: Map? = null, + legalName: CordaX500Name? = null, overrideServices: Map? = null, entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()), vararg advertisedServices: ServiceInfo, - configOverrides: (NodeConfiguration) -> Any? = {}): MockNode { - return createNode(networkMapAddress, forcedID, defaultFactory, start, legalName, overrideServices, entropyRoot, *advertisedServices, configOverrides = configOverrides) + configOverrides: (NodeConfiguration) -> Any? = {}): StartedNode { + return createNode(networkMapAddress, forcedID, defaultFactory, legalName, overrideServices, entropyRoot, *advertisedServices, configOverrides = configOverrides) } /** Like the other [createNode] but takes a [Factory] and propagates its [MockNode] subtype. */ fun createNode(networkMapAddress: SingleMessageRecipient? = null, forcedID: Int? = null, nodeFactory: Factory, - start: Boolean = true, legalName: CordaX500Name? = null, overrideServices: Map? = null, + legalName: CordaX500Name? = null, overrideServices: Map? = null, entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()), vararg advertisedServices: ServiceInfo, - configOverrides: (NodeConfiguration) -> Any? = {}): N { + configOverrides: (NodeConfiguration) -> Any? = {}): StartedNode { + return uncheckedCast(createNodeImpl(networkMapAddress, forcedID, nodeFactory, true, legalName, overrideServices, entropyRoot, advertisedServices, configOverrides).started)!! + } + + private fun createNodeImpl(networkMapAddress: SingleMessageRecipient?, forcedID: Int?, nodeFactory: Factory, + start: Boolean, legalName: CordaX500Name?, overrideServices: Map?, + entropyRoot: BigInteger, + advertisedServices: Array, + configOverrides: (NodeConfiguration) -> Any?): N { val id = forcedID ?: nextNodeId++ val config = testNodeConfiguration( baseDirectory = baseDirectory(id).createDirectories(), @@ -344,7 +372,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, * A bundle that separates the generic user nodes and service-providing nodes. A real network might not be so * clearly separated, but this is convenient for testing. */ - data class BasketOfNodes(val partyNodes: List, val notaryNode: MockNode, val mapNode: MockNode) + data class BasketOfNodes(val partyNodes: List>, val notaryNode: StartedNode, val mapNode: StartedNode) /** * Sets up a network with the requested number of nodes (defaulting to two), with one or more service nodes that @@ -361,9 +389,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, val mapNode = createNode(nodeFactory = nodeFactory, advertisedServices = ServiceInfo(NetworkMapService.type)) val mapAddress = mapNode.network.myAddress val notaryNode = createNode(mapAddress, nodeFactory = nodeFactory, overrideServices = notaryOverride, advertisedServices = notaryServiceInfo) - val nodes = ArrayList() - repeat(numPartyNodes) { - nodes += createPartyNode(mapAddress) + val nodes = (1..numPartyNodes).map { + createPartyNode(mapAddress) } return BasketOfNodes(nodes, notaryNode, mapNode) } @@ -371,21 +398,21 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, fun createNotaryNode(networkMapAddress: SingleMessageRecipient? = null, legalName: CordaX500Name? = null, overrideServices: Map? = null, - serviceName: CordaX500Name? = null): MockNode { + serviceName: CordaX500Name? = null): StartedNode { return createNode(networkMapAddress, legalName = legalName, overrideServices = overrideServices, advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type, serviceName))) } fun createPartyNode(networkMapAddress: SingleMessageRecipient, legalName: CordaX500Name? = null, - overrideServices: Map? = null): MockNode { + overrideServices: Map? = null): StartedNode { return createNode(networkMapAddress, legalName = legalName, overrideServices = overrideServices) } @Suppress("unused") // This is used from the network visualiser tool. fun addressToNode(msgRecipient: MessageRecipients): MockNode { return when (msgRecipient) { - is SingleMessageRecipient -> nodes.single { it.network.myAddress == msgRecipient } + is SingleMessageRecipient -> nodes.single { it.started!!.network.myAddress == msgRecipient } is InMemoryMessagingNetwork.ServiceHandle -> { nodes.filter { it.advertisedServices.any { it == msgRecipient.service.info } }.firstOrNull() ?: throw IllegalArgumentException("Couldn't find node advertising service with info: ${msgRecipient.service.info} ") @@ -396,11 +423,11 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, fun startNodes() { require(nodes.isNotEmpty()) - nodes.forEach { if (!it.started) it.start() } + nodes.forEach { it.started ?: it.start() } } fun stopNodes() { - nodes.forEach { if (it.started) it.stop() } + nodes.forEach { it.started?.dispose() } if (initialiseSerialization) resetTestSerialization() } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt index e0fdcefced..38694c75b9 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt @@ -10,6 +10,7 @@ import net.corda.core.node.services.ServiceType import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.organisation import net.corda.node.internal.Node +import net.corda.node.internal.StartedNode import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.configOf @@ -30,7 +31,6 @@ import org.bouncycastle.asn1.x500.X500Name import org.junit.After import org.junit.Rule import org.junit.rules.TemporaryFolder -import java.util.* import java.util.concurrent.Executors import kotlin.concurrent.thread @@ -46,10 +46,10 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { @JvmField val tempFolder = TemporaryFolder() - private val nodes = ArrayList() - private var _networkMapNode: Node? = null + private val nodes = mutableListOf>() + private var _networkMapNode: StartedNode? = null - val networkMapNode: Node get() = _networkMapNode ?: startNetworkMapNode() + val networkMapNode: StartedNode get() = _networkMapNode ?: startNetworkMapNode() init { System.setProperty("consoleLogLevel", Level.DEBUG.name().toLowerCase()) @@ -62,12 +62,12 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { @After fun stopAllNodes() { val shutdownExecutor = Executors.newScheduledThreadPool(nodes.size) - nodes.map { shutdownExecutor.fork(it::stop) }.transpose().getOrThrow() + nodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow() // Wait until ports are released val portNotBoundChecks = nodes.flatMap { listOf( - it.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }, - it.configuration.rpcAddress?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) } + it.internals.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }, + it.internals.configuration.rpcAddress?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) } ) }.filterNotNull() nodes.clear() @@ -90,7 +90,7 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { platformVersion: Int = 1, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), - configOverrides: Map = emptyMap()): Node { + configOverrides: Map = emptyMap()): StartedNode { check(_networkMapNode == null || _networkMapNode!!.info.legalIdentity.name == legalName) return startNodeInternal(legalName, platformVersion, advertisedServices + ServiceInfo(NetworkMapService.type), rpcUsers, configOverrides).apply { _networkMapNode = this @@ -104,7 +104,7 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { rpcUsers: List = emptyList(), configOverrides: Map = emptyMap(), noNetworkMap: Boolean = false, - waitForConnection: Boolean = true): CordaFuture { + waitForConnection: Boolean = true): CordaFuture> { val networkMapConf = if (noNetworkMap) { // Nonexistent network map service address. mapOf( @@ -116,7 +116,7 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { } else { mapOf( "networkMapService" to mapOf( - "address" to networkMapNode.configuration.p2pAddress.toString(), + "address" to networkMapNode.internals.configuration.p2pAddress.toString(), "legalName" to networkMapNode.info.legalIdentity.name.toString() ) ) @@ -128,12 +128,12 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { rpcUsers, networkMapConf + configOverrides, noNetworkMap) - return if (waitForConnection) node.nodeReadyFuture.map { node } else doneFuture(node) + return if (waitForConnection) node.internals.nodeReadyFuture.map { node } else doneFuture(node) } fun startNotaryCluster(notaryName: CordaX500Name, clusterSize: Int, - serviceType: ServiceType = RaftValidatingNotaryService.type): CordaFuture> { + serviceType: ServiceType = RaftValidatingNotaryService.type): CordaFuture>> { ServiceIdentityGenerator.generateToDisk( (0 until clusterSize).map { baseDirectory(getX500Name(O = "${notaryName.organisation}-$it", L = notaryName.locality, C = notaryName.country)) }, serviceType.id, @@ -170,7 +170,7 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { advertisedServices: Set, rpcUsers: List, configOverrides: Map, - noNetworkMap: Boolean = false): Node { + noNetworkMap: Boolean = false): StartedNode { val baseDirectory = baseDirectory(legalName.x500Name).createDirectories() val localPort = getFreeLocalPorts("localhost", 2) val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString() @@ -189,11 +189,10 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { val parsedConfig = config.parseAs() val node = Node(parsedConfig, parsedConfig.calculateServices(), MOCK_VERSION_INFO.copy(platformVersion = platformVersion), - initialiseSerialization = false) - node.start() + initialiseSerialization = false).start() nodes += node thread(name = legalName.organisation) { - node.run() + node.internals.run() } return node }