Merge remote-tracking branch 'corda/master' into christians_os_merge_20171031

This commit is contained in:
Christian Sailer
2017-11-02 14:41:18 +00:00
1211 changed files with 3627 additions and 432161 deletions

View File

@ -95,6 +95,9 @@ dependencies {
compile "net.sf.jopt-simple:jopt-simple:$jopt_simple_version"
// Artemis: for reliable p2p message queues.
// TODO: remove the forced update of commons-collections and beanutils when artemis updates them
compile "org.apache.commons:commons-collections4:${commons_collections_version}"
compile "commons-beanutils:commons-beanutils:${beanutils_version}"
compile "org.apache.activemq:artemis-server:${artemis_version}"
compile "org.apache.activemq:artemis-core-client:${artemis_version}"
runtime ("org.apache.activemq:artemis-amqp-protocol:${artemis_version}") {

View File

@ -102,7 +102,7 @@ class NodePerformanceTests {
@Test
fun `self pay rate`() {
driver(startNodesInProcess = true) {
driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance")) {
val a = startNotaryNode(
DUMMY_NOTARY.name,
rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<CashIssueFlow>(), startFlowPermission<CashPaymentFlow>())))

View File

@ -20,22 +20,24 @@ import net.corda.core.utilities.seconds
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.nodeapi.User
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.TestDependencyInjectionBase
import net.corda.testing.*
import net.corda.testing.driver.DriverDSLExposedInterface
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.eventually
import net.corda.testing.node.MockServices
import org.junit.Assert.assertEquals
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.net.URLClassLoader
import java.nio.file.Files
import kotlin.test.assertFailsWith
class AttachmentLoadingTests : TestDependencyInjectionBase() {
class AttachmentLoadingTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
private class Services : MockServices() {
private val provider = CordappProviderImpl(CordappLoader.createDevMode(listOf(isolatedJAR)), attachments)
private val cordapp get() = provider.cordapps.first()

View File

@ -8,53 +8,53 @@ import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.StartedNode
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.testing.*
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.chooseIdentity
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.NodeBasedTest
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.dummyCommand
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class RaftNotaryServiceTests : NodeBasedTest(listOf("net.corda.testing.contracts")) {
class RaftNotaryServiceTests {
private val notaryName = CordaX500Name(RaftValidatingNotaryService.id, "RAFT Notary Service", "London", "GB")
@Test
fun `detect double spend`() {
val (bankA) = listOf(
startNode(DUMMY_BANK_A.name),
startNotaryCluster(notaryName, 3).map { it.first() }
).transpose().getOrThrow()
driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.testing.contracts")) {
val (notaryParty) = startNotaryCluster(notaryName, 3).getOrThrow()
val bankA = startNode(providedName = DUMMY_BANK_A.name).map { (it as NodeHandle.InProcess).node }.getOrThrow()
val notaryParty = bankA.services.networkMapCache.getNotary(notaryName)!!
val inputState = issueState(bankA, notaryParty)
val inputState = issueState(bankA, notaryParty)
val firstTxBuilder = TransactionBuilder(notaryParty)
.addInputState(inputState)
.addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey))
val firstSpendTx = bankA.services.signInitialTransaction(firstTxBuilder)
val firstTxBuilder = TransactionBuilder(notaryParty)
.addInputState(inputState)
.addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey))
val firstSpendTx = bankA.services.signInitialTransaction(firstTxBuilder)
val firstSpend = bankA.services.startFlow(NotaryFlow.Client(firstSpendTx))
firstSpend.resultFuture.getOrThrow()
val firstSpend = bankA.services.startFlow(NotaryFlow.Client(firstSpendTx))
firstSpend.resultFuture.getOrThrow()
val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run {
val dummyState = DummyContract.SingleOwnerState(0, bankA.info.chooseIdentity())
addOutputState(dummyState, DummyContract.PROGRAM_ID)
addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey))
this
}
val secondSpendTx = bankA.services.signInitialTransaction(secondSpendBuilder)
val secondSpend = bankA.services.startFlow(NotaryFlow.Client(secondSpendTx))
val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run {
val dummyState = DummyContract.SingleOwnerState(0, bankA.info.chooseIdentity())
addOutputState(dummyState, DummyContract.PROGRAM_ID)
addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey))
this
val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() }
val error = ex.error as NotaryError.Conflict
assertEquals(error.txId, secondSpendTx.id)
}
val secondSpendTx = bankA.services.signInitialTransaction(secondSpendBuilder)
val secondSpend = bankA.services.startFlow(NotaryFlow.Client(secondSpendTx))
val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() }
val error = ex.error as NotaryError.Conflict
assertEquals(error.txId, secondSpendTx.id)
}
private fun issueState(node: StartedNode<*>, notary: Party): StateAndRef<*> {

View File

@ -13,14 +13,12 @@ import net.corda.testing.ALICE
import net.corda.testing.ALICE_KEY
import net.corda.testing.DEV_TRUST_ROOT
import net.corda.testing.getTestPartyAndCertificate
import net.corda.testing.internal.NodeBasedTest
import net.corda.testing.node.MockKeyManagementService
import net.corda.testing.node.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.contentOf
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.observers.TestSubscriber
import rx.schedulers.TestScheduler
import java.nio.file.Path
@ -29,42 +27,36 @@ import kotlin.test.assertEquals
import kotlin.test.assertTrue
class NodeInfoWatcherTest : NodeBasedTest() {
@Rule
@JvmField
var folder = TemporaryFolder()
lateinit var keyManagementService: KeyManagementService
lateinit var nodeInfoPath: Path
val scheduler = TestScheduler()
val testSubscriber = TestSubscriber<NodeInfo>()
// Object under test
lateinit var nodeInfoWatcher: NodeInfoWatcher
companion object {
val nodeInfo = NodeInfo(listOf(), listOf(getTestPartyAndCertificate(ALICE)), 0, 0)
}
private lateinit var keyManagementService: KeyManagementService
private lateinit var nodeInfoPath: Path
private val scheduler = TestScheduler()
private val testSubscriber = TestSubscriber<NodeInfo>()
// Object under test
private lateinit var nodeInfoWatcher: NodeInfoWatcher
@Before
fun start() {
val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT)
keyManagementService = MockKeyManagementService(identityService, ALICE_KEY)
nodeInfoWatcher = NodeInfoWatcher(folder.root.toPath(), scheduler = scheduler)
nodeInfoPath = folder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler = scheduler)
nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
}
@Test
fun `save a NodeInfo`() {
assertEquals(0,
folder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
NodeInfoWatcher.saveToFile(folder.root.toPath(), nodeInfo, keyManagementService)
assertEquals(0, tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), nodeInfo, keyManagementService)
val nodeInfoFiles = folder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
val nodeInfoFiles = tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
assertEquals(1, nodeInfoFiles.size)
val fileName = nodeInfoFiles.first()
assertTrue(fileName.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX))
val file = (folder.root.path / fileName).toFile()
val file = (tempFolder.root.path / fileName).toFile()
// Just check that something is written, another tests verifies that the written value can be read back.
assertThat(contentOf(file)).isNotEmpty()
}

View File

@ -1,41 +1,28 @@
package net.corda.node.services.network
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.testing.*
import net.corda.testing.node.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.chooseIdentity
import net.corda.testing.internal.NodeBasedTest
import org.junit.Before
import org.junit.Test
import java.time.Duration
import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertTrue
private const val BRIDGE_RETRY_MS: Long = 100
class PersistentNetworkMapCacheTest : NodeBasedTest() {
private val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB)
private val addressesMap: HashMap<CordaX500Name, NetworkHostAndPort> = HashMap()
private val addressesMap = HashMap<CordaX500Name, NetworkHostAndPort>()
private val infos: MutableSet<NodeInfo> = HashSet()
companion object {
val logger = loggerFor<PersistentNetworkMapCacheTest>()
}
@Before
fun start() {
val nodes = startNodesWithPort(partiesList)
nodes.forEach { it.internals.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting.
nodes.forEach {
infos.add(it.info)
addressesMap[it.info.chooseIdentity().name] = it.info.addresses[0]
@ -44,8 +31,8 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
}
@Test
fun `get nodes by owning key and by name, no network map service`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
fun `get nodes by owning key and by name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netCache = alice.services.networkMapCache
alice.database.transaction {
val res = netCache.getNodeByLegalIdentity(alice.info.chooseIdentity())
@ -56,8 +43,8 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
}
@Test
fun `get nodes by address no network map service`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
fun `get nodes by address`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netCache = alice.services.networkMapCache
alice.database.transaction {
val res = netCache.getNodeByAddress(alice.info.addresses[0])
@ -66,150 +53,20 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
}
@Test
fun `restart node with DB map cache and no network map`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
fun `restart node with DB map cache`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val partyNodes = alice.services.networkMapCache.allNodes
assertEquals(NullNetworkMapService, alice.inNodeNetworkMapService)
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
}
@Test
fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() {
val parties = partiesList.subList(1, partiesList.size)
val nodes = startNodesWithPort(parties, noNetworkMap = true)
assertTrue(nodes.all { it.inNodeNetworkMapService == NullNetworkMapService })
nodes.forEach {
val partyNodes = it.services.networkMapCache.allNodes
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
}
checkConnectivity(nodes)
}
@Test
fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() {
val parties = partiesList.subList(1, partiesList.size)
val nodes = startNodesWithPort(parties, noNetworkMap = false)
assertTrue(nodes.all { it.inNodeNetworkMapService == NullNetworkMapService })
nodes.forEach {
val partyNodes = it.services.networkMapCache.allNodes
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
}
checkConnectivity(nodes)
}
@Test
fun `start node and network map communicate`() {
val parties = partiesList.subList(0, 2)
val nodes = startNodesWithPort(parties, noNetworkMap = false)
checkConnectivity(nodes)
}
@Test
fun `start node without networkMapService and no database - fail`() {
assertFails { startNode(CHARLIE.name, noNetworkMap = true).getOrThrow(2.seconds) }
}
@Test
fun `new node joins network without network map started`() {
fun customNodesStart(parties: List<Party>): List<StartedNode<Node>> =
startNodesWithPort(parties, noNetworkMap = false, customRetryIntervalMs = BRIDGE_RETRY_MS)
val parties = partiesList.subList(1, partiesList.size)
// Start 2 nodes pointing at network map, but don't start network map service.
val otherNodes = customNodesStart(parties)
otherNodes.forEach { node ->
assertTrue(infos.any { it.legalIdentitiesAndCerts.toSet() == node.info.legalIdentitiesAndCerts.toSet() })
}
// Start node that is not in databases of other nodes. Point to NMS. Which has't started yet.
val charlie = customNodesStart(listOf(CHARLIE)).single()
otherNodes.forEach {
assertThat(it.services.networkMapCache.allNodes).doesNotContain(charlie.info)
}
// Start Network Map and see that charlie node appears in caches.
val nms = customNodesStart(listOf(DUMMY_NOTARY)).single()
nms.internals.startupComplete.get()
assertTrue(nms.inNodeNetworkMapService != NullNetworkMapService)
assertTrue(infos.any { it.legalIdentities.toSet() == nms.info.legalIdentities.toSet() })
otherNodes.forEach {
assertTrue(nms.info.chooseIdentity() in it.services.networkMapCache.allNodes.map { it.chooseIdentity() })
}
charlie.internals.nodeReadyFuture.get() // Finish registration.
val allTheStartedNodesPopulation = otherNodes.plus(charlie).plus(nms)
// This is prediction of the longest time it will take to get the cluster into a stable state such that further
// testing can be performed upon it
val maxInstabilityInterval = BRIDGE_RETRY_MS * allTheStartedNodesPopulation.size * 30
logger.info("Instability interval is set to: $maxInstabilityInterval ms")
// TODO: Re-visit this sort of re-try for stable cluster once network map redesign is finished.
eventually<AssertionError, Unit>(Duration.ofMillis(maxInstabilityInterval)) {
logger.info("Checking connectivity")
checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS.
logger.info("Loading caches")
val cacheA = otherNodes[0].services.networkMapCache.allNodes
val cacheB = otherNodes[1].services.networkMapCache.allNodes
val cacheC = charlie.services.networkMapCache.allNodes
logger.info("Performing verification")
assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap
assertThat(cacheB).contains(charlie.info)
assertEquals(cacheA.toSet(), cacheB.toSet())
assertEquals(cacheA.toSet(), cacheC.toSet())
}
}
// HELPERS
// Helper function to restart nodes with the same host and port.
private fun startNodesWithPort(nodesToStart: List<Party>, noNetworkMap: Boolean = false, customRetryIntervalMs: Long? = null): List<StartedNode<Node>> {
private fun startNodesWithPort(nodesToStart: List<Party>, customRetryIntervalMs: Long? = null): List<StartedNode<Node>> {
return nodesToStart.map { party ->
val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) +
(customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap())
if (party == DUMMY_NOTARY) {
startNetworkMapNode(party.name, configOverrides = configOverrides)
} else {
startNode(party.name,
configOverrides = configOverrides,
noNetworkMap = noNetworkMap,
waitForConnection = false).getOrThrow()
}
}
}
// Check that nodes are functional, communicate each with each.
private fun checkConnectivity(nodes: List<StartedNode<*>>) {
nodes.forEach { node1 ->
nodes.forEach { node2 ->
if (!(node1 === node2)) { // Do not check connectivity to itself
node2.internals.registerInitiatedFlow(SendBackFlow::class.java)
val resultFuture = node1.services.startFlow(SendFlow(node2.info.chooseIdentity())).resultFuture
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!")
}
}
}
}
@InitiatingFlow
private class SendFlow(val otherParty: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
logger.info("SEND FLOW to $otherParty")
logger.info("Party key ${otherParty.owningKey.toBase58String()}")
val session = initiateFlow(otherParty)
return session.sendAndReceive<String>("Hi!").unwrap { it }
}
}
@InitiatedBy(SendFlow::class)
private class SendBackFlow(val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
logger.info("SEND BACK FLOW to ${otherSideSession.counterparty}")
logger.info("Party key ${otherSideSession.counterparty.owningKey.toBase58String()}")
otherSideSession.send("Hello!")
startNode(party.name, configOverrides = configOverrides)
}
}
}

View File

@ -5,22 +5,20 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.chooseIdentity
import net.corda.testing.node.NodeBasedTest
import net.corda.testing.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
class FlowVersioningTest : NodeBasedTest() {
@Test
fun `getFlowContext returns the platform version for core flows`() {
val (alice, bob) = listOf(
startNode(ALICE.name, platformVersion = 2),
startNode(BOB.name, platformVersion = 3)).transpose().getOrThrow()
val alice = startNode(ALICE.name, platformVersion = 2)
val bob = startNode(BOB.name, platformVersion = 3)
bob.internals.installCoreFlow(PretendInitiatingCoreFlow::class, ::PretendInitiatedCoreFlow)
val (alicePlatformVersionAccordingToBob, bobPlatformVersionAccordingToAlice) = alice.services.startFlow(
PretendInitiatingCoreFlow(bob.info.chooseIdentity())).resultFuture.getOrThrow()

View File

@ -28,8 +28,8 @@ import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.chooseIdentity
import net.corda.testing.configureTestSSL
import net.corda.testing.internal.NodeBasedTest
import net.corda.testing.messaging.SimpleMQClient
import net.corda.testing.node.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.SimpleString
@ -52,7 +52,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
@Before
fun start() {
alice = startNode(ALICE.name, rpcUsers = extraRPCUsers + rpcUser).getOrThrow()
alice = startNode(ALICE.name, rpcUsers = extraRPCUsers + rpcUser)
attacker = createAttacker()
startAttacker(attacker)
}
@ -87,7 +87,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
@Test
fun `create queue for peer which has not been communicated with`() {
val bob = startNode(BOB.name).getOrThrow()
val bob = startNode(BOB.name)
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toBase58String()}")
}
@ -219,7 +219,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
private fun startBobAndCommunicateWithAlice(): Party {
val bob = startNode(BOB.name).getOrThrow()
val bob = startNode(BOB.name)
bob.internals.registerInitiatedFlow(ReceiveFlow::class.java)
val bobParty = bob.info.chooseIdentity()
// Perform a protocol exchange to force the peer queue to be created

View File

@ -3,10 +3,8 @@ package net.corda.services.messaging
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.elapsedTime
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.randomOrNull
import net.corda.core.internal.times
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.serialization.CordaSerializable
@ -14,114 +12,122 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.testing.*
import net.corda.testing.node.NodeBasedTest
import net.corda.testing.ALICE
import net.corda.testing.chooseIdentity
import net.corda.testing.driver.DriverDSLExposedInterface
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import org.assertj.core.api.Assertions.assertThat
import org.junit.Ignore
import org.junit.Test
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
class P2PMessagingTest : NodeBasedTest() {
class P2PMessagingTest {
private companion object {
val DISTRIBUTED_SERVICE_NAME = CordaX500Name(RaftValidatingNotaryService.id, "DistributedService", "London", "GB")
}
@Test
fun `network map will work after restart`() {
val identities = listOf(DUMMY_BANK_A, DUMMY_BANK_B, DUMMY_NOTARY)
fun startNodes() = identities.map { startNode(it.name) }.transpose()
val startUpDuration = elapsedTime { startNodes().getOrThrow() }
// Start the network map a second time - this will restore message queues from the journal.
// This will hang and fail prior the fix. https://github.com/corda/corda/issues/37
clearAllNodeInfoDb() // Clear network map data from nodes databases.
stopAllNodes()
startNodes().getOrThrow(timeout = startUpDuration * 3)
}
@Ignore
@Test
fun `communicating with a distributed service which we're part of`() {
val distributedService = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow()
assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, distributedService[0])
driver(startNodesInProcess = true) {
val distributedService = startDistributedService()
assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, distributedService[0])
}
}
@Test
fun `distributed service requests are retried if one of the nodes in the cluster goes down without sending a response`() {
val distributedServiceNodes = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow()
val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
val serviceAddress = alice.services.networkMapCache.run {
val notaryParty = notaryIdentities.randomOrNull()!!
alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!)
driver(startNodesInProcess = true) {
val distributedServiceNodes = startDistributedService()
val alice = startAlice()
val serviceAddress = alice.services.networkMapCache.run {
val notaryParty = notaryIdentities.randomOrNull()!!
alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!)
}
val dummyTopic = "dummy.topic"
val responseMessage = "response"
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
// Send a single request with retry
val responseFuture = with(alice.network) {
val request = TestRequest(replyTo = myAddress)
val responseFuture = onNext<Any>(dummyTopic, request.sessionID)
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
send(msg, serviceAddress, retryId = request.sessionID)
responseFuture
}
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// The request wasn't successful.
assertThat(responseFuture.isDone).isFalse()
crashingNodes.ignoreRequests = false
// The retry should be successful.
val response = responseFuture.getOrThrow(10.seconds)
assertThat(response).isEqualTo(responseMessage)
}
val dummyTopic = "dummy.topic"
val responseMessage = "response"
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
// Send a single request with retry
val responseFuture = with(alice.network) {
val request = TestRequest(replyTo = myAddress)
val responseFuture = onNext<Any>(dummyTopic, request.sessionID)
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
send(msg, serviceAddress, retryId = request.sessionID)
responseFuture
}
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// The request wasn't successful.
assertThat(responseFuture.isDone).isFalse()
crashingNodes.ignoreRequests = false
// The retry should be successful.
val response = responseFuture.getOrThrow(10.seconds)
assertThat(response).isEqualTo(responseMessage)
}
@Test
fun `distributed service request retries are persisted across client node restarts`() {
val distributedServiceNodes = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow()
val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
val serviceAddress = alice.services.networkMapCache.run {
val notaryParty = notaryIdentities.randomOrNull()!!
alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!)
driver(startNodesInProcess = true) {
val distributedServiceNodes = startDistributedService()
val alice = startAlice()
val serviceAddress = alice.services.networkMapCache.run {
val notaryParty = notaryIdentities.randomOrNull()!!
alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!)
}
val dummyTopic = "dummy.topic"
val responseMessage = "response"
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
val sessionId = random63BitValue()
// Send a single request with retry
with(alice.network) {
val request = TestRequest(sessionId, myAddress)
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
send(msg, serviceAddress, retryId = request.sessionID)
}
// Wait until the first request is received
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// Stop alice's node after we ensured that the first request was delivered and ignored.
alice.dispose()
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
crashingNodes.ignoreRequests = false
// Restart the node and expect a response
val aliceRestarted = startAlice()
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived)
assertThat(response).isEqualTo(responseMessage)
}
}
val dummyTopic = "dummy.topic"
val responseMessage = "response"
private fun DriverDSLExposedInterface.startDistributedService(): List<StartedNode<Node>> {
return startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2)
.getOrThrow()
.second
.map { (it as NodeHandle.InProcess).node }
}
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
val sessionId = random63BitValue()
// Send a single request with retry
with(alice.network) {
val request = TestRequest(sessionId, myAddress)
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
send(msg, serviceAddress, retryId = request.sessionID)
}
// Wait until the first request is received
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// Stop alice's node after we ensured that the first request was delivered and ignored.
alice.dispose()
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
crashingNodes.ignoreRequests = false
// Restart the node and expect a response
val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived)
assertThat(response).isEqualTo(responseMessage)
private fun DriverDSLExposedInterface.startAlice(): StartedNode<Node> {
return startNode(providedName = ALICE.name, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1))
.map { (it as NodeHandle.InProcess).node }
.getOrThrow()
}
data class CrashingNodes(
@ -201,4 +207,4 @@ class P2PMessagingTest : NodeBasedTest() {
@CordaSerializable
private data class TestRequest(override val sessionID: Long = random63BitValue(),
override val replyTo: SingleMessageRecipient) : ServiceRequestMessage
}
}

View File

@ -1,79 +0,0 @@
package net.corda.services.messaging
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.NodeInfo
import net.corda.core.internal.cert
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.ActiveMqServerConfiguration
import net.corda.node.services.config.BridgeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.testing.*
import net.corda.testing.node.NodeBasedTest
import net.corda.testing.node.SimpleNode
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.security.cert.X509Certificate
import java.time.Instant
import java.util.concurrent.TimeoutException
class P2PSecurityTest : NodeBasedTest() {
@Test
fun `incorrect legal name for the network map service config`() {
val incorrectNetworkMapName = CordaX500Name(organisation = "NetworkMap-${random63BitValue()}",
locality = "London", country = "GB")
val node = startNode(BOB.name, configOverrides = mapOf(
"networkMapService" to mapOf(
"address" to networkMapNode.internals.configuration.p2pAddress.toString(),
"legalName" to incorrectNetworkMapName.toString()
)
))
// The connection will be rejected as the legal name doesn't match
assertThatThrownBy { node.getOrThrow() }.hasMessageContaining(incorrectNetworkMapName.toString())
}
@Test
fun `register with the network map service using a legal name different from the TLS CN`() {
startSimpleNode(DUMMY_BANK_A.name, DEV_TRUST_ROOT.cert).use {
// Register with the network map using a different legal name
val response = it.registerWithNetworkMap(DUMMY_BANK_B.name)
// We don't expect a response because the network map's host verification will prevent a connection back
// to the attacker as the TLS CN will not match the legal name it has just provided
assertThatExceptionOfType(TimeoutException::class.java).isThrownBy {
response.getOrThrow(2.seconds)
}
}
}
private fun startSimpleNode(legalName: CordaX500Name,
trustRoot: X509Certificate): SimpleNode {
val config = testNodeConfiguration(
baseDirectory = baseDirectory(legalName),
myLegalName = legalName).also {
doReturn(NetworkMapInfo(networkMapNode.internals.configuration.p2pAddress, networkMapNode.info.chooseIdentity().name)).whenever(it).networkMapService
doReturn(ActiveMqServerConfiguration(BridgeConfiguration(1001, 2, 3.4))).whenever(it).activeMQServer
}
config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name
return SimpleNode(config, trustRoot = trustRoot).apply { start() }
}
private fun SimpleNode.registerWithNetworkMap(registrationName: CordaX500Name): CordaFuture<NetworkMapService.RegistrationResponse> {
val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public)
val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), listOf(legalIdentity), 1, serial = 1)
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
val request = RegistrationRequest(registration.toWire(keyService, identity.public), network.myAddress)
return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.network.myAddress)
}
}

View File

@ -4,8 +4,7 @@ import joptsimple.OptionParser
import joptsimple.util.EnumConverter
import net.corda.core.internal.div
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.nodeapi.config.parseAs
import net.corda.node.services.config.parseAsNodeConfiguration
import org.slf4j.event.Level
import java.io.PrintStream
import java.nio.file.Path
@ -71,6 +70,5 @@ data class CmdLineOptions(val baseDirectory: Path,
val sshdServer: Boolean,
val justGenerateNodeInfo: Boolean) {
fun loadConfig() = ConfigHelper
.loadConfig(baseDirectory, configFile)
.parseAs<FullNodeConfiguration>()
.loadConfig(baseDirectory, configFile).parseAsNodeConfiguration()
}

View File

@ -12,14 +12,10 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.*
import net.corda.core.node.AppServiceHub
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.StateLoader
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
@ -90,7 +86,6 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
* Marked as SingletonSerializeAsToken to prevent the invisible reference to AbstractNode in the ServiceHub accidentally
* sweeping up the Node into the Kryo checkpoint serialization via any flows holding a reference to ServiceHub.
*/
// TODO: Where this node is the initial network map service, currently no networkMapService is provided.
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
abstract class AbstractNode(config: NodeConfiguration,
@ -111,7 +106,6 @@ abstract class AbstractNode(config: NodeConfiguration,
override val checkpointStorage: CheckpointStorage,
override val smm: StateMachineManager,
override val attachments: NodeAttachmentService,
override val inNodeNetworkMapService: NetworkMapService,
override val network: MessagingService,
override val database: CordaPersistence,
override val rpcOps: CordaRPCOps,
@ -119,14 +113,8 @@ abstract class AbstractNode(config: NodeConfiguration,
internal val schedulerService: NodeSchedulerService) : StartedNode<N> {
override val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by services, FlowStarter by flowStarter {}
}
// TODO: Persist this, as well as whether the node is registered.
/**
* Sequence number of changes sent to the network map service, when registering/de-registering this node.
*/
var networkMapSeq: Long = 1
protected abstract val log: Logger
protected abstract val networkMapAddress: SingleMessageRecipient?
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
@ -146,7 +134,6 @@ abstract class AbstractNode(config: NodeConfiguration,
protected lateinit var smm: StateMachineManager
private lateinit var tokenizableServices: List<Any>
protected lateinit var attachments: NodeAttachmentService
protected lateinit var inNodeNetworkMapService: NetworkMapService
protected lateinit var network: MessagingService
protected val runOnStop = ArrayList<() -> Any?>()
protected lateinit var database: CordaPersistence
@ -232,10 +219,12 @@ abstract class AbstractNode(config: NodeConfiguration,
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
runOnStop += network::stop
StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, inNodeNetworkMapService, network, database, rpcOps, flowStarter, schedulerService)
StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, schedulerService)
}
// If we successfully loaded network data from database, we set this future to Unit.
_nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured())
services.networkMapCache.addNode(info)
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
return startedImpl.apply {
database.transaction {
smm.start(tokenizableServices)
@ -492,7 +481,7 @@ abstract class AbstractNode(config: NodeConfiguration,
services.auditService, services.monitoringService, networkMapCache, services.schemaService,
services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService,
services, cordappProvider, this)
makeNetworkServices(network, networkMapCache, tokenizableServices)
makeNetworkServices(tokenizableServices)
return tokenizableServices
}
@ -553,16 +542,7 @@ abstract class AbstractNode(config: NodeConfiguration,
}
}
private fun setupInNodeNetworkMapService(networkMapCache: NetworkMapCacheInternal) {
inNodeNetworkMapService =
if (configuration.networkMapService == null && !configuration.noNetworkMapServiceMode)
makeNetworkMapService(network, networkMapCache)
else
NullNetworkMapService
}
private fun makeNetworkServices(network: MessagingService, networkMapCache: NetworkMapCacheInternal, tokenizableServices: MutableList<Any>) {
setupInNodeNetworkMapService(networkMapCache)
private fun makeNetworkServices(tokenizableServices: MutableList<Any>) {
configuration.notary?.let {
val notaryService = makeCoreNotaryService(it)
tokenizableServices.add(notaryService)
@ -573,40 +553,6 @@ abstract class AbstractNode(config: NodeConfiguration,
}
}
private fun registerWithNetworkMapIfConfigured(): CordaFuture<Unit> {
services.networkMapCache.addNode(info)
// In the unit test environment, we may sometimes run without any network map service
return if (networkMapAddress == null && inNodeNetworkMapService == NullNetworkMapService) {
services.networkMapCache.runWithoutMapService()
noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache
} else {
val netMapRegistration = registerWithNetworkMap()
// We may want to start node immediately with database data and not wait for network map registration (but send it either way).
// So we are ready to go.
if (services.networkMapCache.loadDBSuccess) {
log.info("Node successfully loaded network map data from the database.")
doneFuture(Unit)
} else {
netMapRegistration
}
}
}
/**
* Register this node with the network map cache, and load network map from a remote service (and register for
* updates) if one has been supplied.
*/
protected open fun registerWithNetworkMap(): CordaFuture<Unit> {
val address: SingleMessageRecipient = networkMapAddress ?:
network.getAddressOfParty(PartyInfo.SingleNode(services.myInfo.legalIdentitiesAndCerts.first().party, info.addresses)) as SingleMessageRecipient
// Register for updates, even if we're the one running the network map.
return sendNetworkMapRegistration(address).flatMap { (error) ->
check(error == null) { "Unable to register with the network map service: $error" }
// The future returned addMapService will complete on the same executor as sendNetworkMapRegistration, namely the one used by net
services.networkMapCache.addMapService(network, address, true, null)
}
}
private fun sendNetworkMapRegistration(networkMapAddress: SingleMessageRecipient): CordaFuture<RegistrationResponse> {
// Register this node against the network
val instant = platformClock.instant()
@ -619,14 +565,10 @@ abstract class AbstractNode(config: NodeConfiguration,
/** Return list of node's addresses. It's overridden in MockNetwork as we don't have real addresses for MockNodes. */
protected abstract fun myAddresses(): List<NetworkHostAndPort>
/** This is overriden by the mock node implementation to enable operation without any network map service */
protected open fun noNetworkMapConfigured(): CordaFuture<Unit> {
if (services.networkMapCache.loadDBSuccess || configuration.noNetworkMapServiceMode) {
return doneFuture(Unit)
} else {
open protected fun checkNetworkMapIsInitialized() {
if (!services.networkMapCache.loadDBSuccess || configuration.noNetworkMapServiceMode) {
// TODO: There should be a consistent approach to configuration error exceptions.
throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " +
"has any other map node been configured.")
throw NetworkMapCacheEmptyException()
}
}
@ -803,9 +745,9 @@ abstract class AbstractNode(config: NodeConfiguration,
return flowFactories[initiatingFlowClass]
}
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
database.transaction {
super.recordTransactions(notifyVault, txs)
super.recordTransactions(statesToRecord, txs)
}
}
@ -818,3 +760,8 @@ internal class FlowStarterImpl(private val serverThread: AffinityExecutor, priva
return serverThread.fetchFrom { smm.startFlow(logic, flowInitiator, ourIdentity) }
}
}
/**
* Thrown when a node is about to start and its network map cache doesn't contain any node.
*/
internal class NetworkMapCacheEmptyException: Exception()

View File

@ -5,8 +5,6 @@ import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.uncheckedCast
@ -22,7 +20,6 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectRequestProperty
@ -37,7 +34,6 @@ import net.corda.node.utilities.TestClock
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.IP_REQUEST_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ShutdownHook
@ -62,7 +58,7 @@ import kotlin.system.exitProcess
*
* @param configuration This is typically loaded from a TypeSafe HOCON configuration file.
*/
open class Node(configuration: FullNodeConfiguration,
open class Node(override val configuration: NodeConfiguration,
versionInfo: VersionInfo,
val initialiseSerialization: Boolean = true,
cordappLoader: CordappLoader = makeCordappLoader(configuration)
@ -84,7 +80,7 @@ open class Node(configuration: FullNodeConfiguration,
exitProcess(1)
}
private fun createClock(configuration: FullNodeConfiguration): Clock {
private fun createClock(configuration: NodeConfiguration): Clock {
return if (configuration.useTestClock) TestClock() else NodeClock()
}
@ -99,8 +95,6 @@ open class Node(configuration: FullNodeConfiguration,
}
override val log: Logger get() = logger
override val configuration get() = super.configuration as FullNodeConfiguration // Necessary to avoid init order NPE.
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
override fun makeTransactionVerifierService() = (network as NodeMessagingClient).verifierService
private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1"
@ -153,23 +147,16 @@ open class Node(configuration: FullNodeConfiguration,
override fun makeMessagingService(legalIdentity: PartyAndCertificate): MessagingService {
userService = RPCUserServiceImpl(configuration.rpcUsers)
val (serverAddress, advertisedAddress) = with(configuration) {
if (messagingServerAddress != null) {
// External broker
messagingServerAddress to messagingServerAddress
} else {
makeLocalMessageBroker() to getAdvertisedAddress()
}
}
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val advertisedAddress = configuration.messagingServerAddress ?: getAdvertisedAddress()
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) legalIdentity.owningKey else null
return NodeMessagingClient(
configuration,
versionInfo,
serverAddress,
myIdentityOrNullIfNetworkMapService,
legalIdentity.owningKey,
serverThread,
database,
nodeReadyFuture,
@ -201,16 +188,14 @@ open class Node(configuration: FullNodeConfiguration,
/**
* Checks whether the specified [host] is a public IP address or hostname. If not, tries to discover the current
* machine's public IP address to be used instead. It first looks through the network interfaces, and if no public IP
* is found, asks the network map service to provide it.
* machine's public IP address to be used instead by looking through the network interfaces.
* TODO this code used to rely on the networkmap node, we might want to look at a different solution.
*/
private fun tryDetectIfNotPublicHost(host: String): String? {
if (!AddressUtils.isPublic(host)) {
val foundPublicIP = AddressUtils.tryDetectPublicIP()
if (foundPublicIP == null) {
networkMapAddress?.let { return discoverPublicHost(it.hostAndPort) }
} else {
if (foundPublicIP != null) {
log.info("Detected public IP: ${foundPublicIP.hostAddress}. This will be used instead of the provided \"$host\" as the advertised address.")
return foundPublicIP.hostAddress
}
@ -276,15 +261,6 @@ open class Node(configuration: FullNodeConfiguration,
(network as NodeMessagingClient).start(rpcOps, userService)
}
/**
* Insert an initial step in the registration process which will throw an exception if a non-recoverable error is
* encountered when trying to connect to the network map node.
*/
override fun registerWithNetworkMap(): CordaFuture<Unit> {
val networkMapConnection = messageBroker?.networkMapConnectionFuture ?: doneFuture(Unit)
return networkMapConnection.flatMap { super.registerWithNetworkMap() }
}
override fun myAddresses(): List<NetworkHostAndPort> {
val address = network.myAddress as ArtemisMessagingComponent.ArtemisPeerAddress
return listOf(address.hostAndPort)

View File

@ -1,16 +1,13 @@
package net.corda.node.internal
import com.jcabi.manifests.Manifests
import com.jcraft.jsch.JSch
import com.jcraft.jsch.JSchException
import com.typesafe.config.ConfigException
import joptsimple.OptionException
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.utilities.loggerFor
import net.corda.node.*
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.RelayConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.transactions.bftSMaRtSerialFilter
import net.corda.node.shell.InteractiveShell
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
@ -20,7 +17,6 @@ import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import org.slf4j.bridge.SLF4JBridgeHandler
import sun.misc.VMSupport
import java.io.IOException
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.net.InetAddress
@ -88,11 +84,11 @@ open class NodeStartup(val args: Array<String>) {
exitProcess(0)
}
open protected fun preNetworkRegistration(conf: FullNodeConfiguration) = Unit
open protected fun preNetworkRegistration(conf: NodeConfiguration) = Unit
open protected fun createNode(conf: FullNodeConfiguration, versionInfo: VersionInfo): Node = Node(conf, versionInfo)
open protected fun createNode(conf: NodeConfiguration, versionInfo: VersionInfo): Node = Node(conf, versionInfo)
open protected fun startNode(conf: FullNodeConfiguration, versionInfo: VersionInfo, startTime: Long, cmdlineOptions: CmdLineOptions) {
open protected fun startNode(conf: NodeConfiguration, versionInfo: VersionInfo, startTime: Long, cmdlineOptions: CmdLineOptions) {
val node = createNode(conf, versionInfo)
if (cmdlineOptions.justGenerateNodeInfo) {
// Perform the minimum required start-up logic to be able to write a nodeInfo to disk
@ -122,14 +118,14 @@ open class NodeStartup(val args: Array<String>) {
startedNode.internals.run()
}
open protected fun logStartupInfo(versionInfo: VersionInfo, cmdlineOptions: CmdLineOptions, conf: FullNodeConfiguration) {
open protected fun logStartupInfo(versionInfo: VersionInfo, cmdlineOptions: CmdLineOptions, conf: NodeConfiguration) {
logger.info("Vendor: ${versionInfo.vendor}")
logger.info("Release: ${versionInfo.releaseVersion}")
logger.info("Platform Version: ${versionInfo.platformVersion}")
logger.info("Revision: ${versionInfo.revision}")
val info = ManagementFactory.getRuntimeMXBean()
logger.info("PID: ${info.name.split("@").firstOrNull()}") // TODO Java 9 has better support for this
logger.info("Main class: ${FullNodeConfiguration::class.java.protectionDomain.codeSource.location.toURI().path}")
logger.info("Main class: ${NodeConfiguration::class.java.protectionDomain.codeSource.location.toURI().path}")
logger.info("CommandLine Args: ${info.inputArguments.joinToString(" ")}")
logger.info("Application Args: ${args.joinToString(" ")}")
logger.info("bootclasspath: ${info.bootClassPath}")
@ -144,7 +140,7 @@ open class NodeStartup(val args: Array<String>) {
logger.info("Starting as node on ${conf.p2pAddress}")
}
open protected fun maybeRegisterWithNetworkAndExit(cmdlineOptions: CmdLineOptions, conf: FullNodeConfiguration) {
open protected fun maybeRegisterWithNetworkAndExit(cmdlineOptions: CmdLineOptions, conf: NodeConfiguration) {
if (!cmdlineOptions.isRegistration) return
println()
println("******************************************************************")
@ -156,7 +152,7 @@ open class NodeStartup(val args: Array<String>) {
exitProcess(0)
}
open protected fun loadConfigFile(cmdlineOptions: CmdLineOptions): FullNodeConfiguration {
open protected fun loadConfigFile(cmdlineOptions: CmdLineOptions): NodeConfiguration {
try {
return cmdlineOptions.loadConfig()
} catch (e: ConfigException) {
@ -165,7 +161,7 @@ open class NodeStartup(val args: Array<String>) {
}
}
open protected fun banJavaSerialisation(conf: FullNodeConfiguration) {
open protected fun banJavaSerialisation(conf: NodeConfiguration) {
SerialFilter.install(if (conf.notary?.bftSMaRt != null) ::bftSMaRtSerialFilter else ::defaultSerialFilter)
}

View File

@ -13,7 +13,6 @@ import net.corda.core.serialization.SerializeAsToken
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
@ -25,7 +24,6 @@ interface StartedNode<out N : AbstractNode> {
val checkpointStorage: CheckpointStorage
val smm: StateMachineManager
val attachments: NodeAttachmentService
val inNodeNetworkMapService: NetworkMapService
val network: MessagingService
val database: CordaPersistence
val rpcOps: CordaRPCOps

View File

@ -274,6 +274,7 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
/** @param rootPackageName only this package and subpackages may be extracted from [url], or null to allow all packages. */
private class RestrictedURL(val url: URL, rootPackageName: String?) {
val qualifiedNamePrefix = rootPackageName?.let { it + '.' } ?: ""
override fun toString() = url.toString()
}
private inner class RestrictedScanResult(private val scanResult: ScanResult, private val qualifiedNamePrefix: String) {

View File

@ -7,6 +7,7 @@ import net.corda.core.contracts.requireThat
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.ContractUpgradeUtils
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
@ -16,8 +17,7 @@ import net.corda.core.transactions.SignedTransaction
class FinalityHandler(private val sender: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val stx = subFlow(ReceiveTransactionFlow(sender))
serviceHub.recordTransactions(stx)
subFlow(ReceiveTransactionFlow(sender, true, StatesToRecord.ONLY_RELEVANT))
}
}

View File

@ -18,6 +18,12 @@ interface RPCUserService {
// TODO Or ditch this and consider something like Apache Shiro
// TODO Need access to permission checks from inside flows and at other point during audit checking.
class RPCUserServiceImpl(override val users: List<User>) : RPCUserService {
init {
users.forEach {
require(it.username.matches("\\w+".toRegex())) { "Username ${it.username} contains invalid characters" }
}
}
override fun getUser(username: String): User? = users.find { it.username == username }
}

View File

@ -11,10 +11,10 @@ import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCacheBase
import net.corda.core.node.services.TransactionStorage
@ -32,35 +32,12 @@ import net.corda.node.utilities.CordaPersistence
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal
interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
/**
* Deregister from updates from the given map service.
* @param network the network messaging service.
* @param mapParty the network map service party to fetch current state from.
*/
fun deregisterForUpdates(network: MessagingService, mapParty: Party): CordaFuture<Unit>
/**
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
* updates.
* @param network the network messaging service.
* @param networkMapAddress the network map service to fetch current state from.
* @param subscribe if the cache should subscribe to updates.
* @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map
* version is less than or equal to the given version, no update is fetched.
*/
fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient,
subscribe: Boolean, ifChangedSinceVer: Int? = null): CordaFuture<Unit>
/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)
/** Removes a node from the local cache. */
fun removeNode(node: NodeInfo)
/** For testing where the network map cache is manipulated marks the service as immediately ready. */
@VisibleForTesting
fun runWithoutMapService()
/** Indicates if loading network map data from database was successful. */
val loadDBSuccess: Boolean
}
@ -93,7 +70,7 @@ interface ServiceHubInternal : ServiceHub {
val database: CordaPersistence
val configuration: NodeConfiguration
override val cordappProvider: CordappProviderInternal
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
require(txs.any()) { "No transactions passed in for recording" }
val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) }
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
@ -105,9 +82,43 @@ interface ServiceHubInternal : ServiceHub {
log.warn("Transactions recorded from outside of a state machine")
}
if (notifyVault) {
if (statesToRecord != StatesToRecord.NONE) {
val toNotify = recordedTransactions.map { if (it.isNotaryChangeTransaction()) it.notaryChangeTx else it.tx }
vaultService.notifyAll(toNotify)
// When the user has requested StatesToRecord.ALL we may end up recording and relationally mapping states
// that do not involve us and that we cannot sign for. This will break coin selection and thus a warning
// is present in the documentation for this feature (see the "Observer nodes" tutorial on docs.corda.net).
//
// The reason for this is three-fold:
//
// 1) We are putting in place the observer mode feature relatively quickly to meet specific customer
// launch target dates.
//
// 2) The right design for vaults which mix observations and relevant states isn't entirely clear yet.
//
// 3) If we get the design wrong it could create security problems and business confusions.
//
// Back in the bitcoinj days I did add support for "watching addresses" to the wallet code, which is the
// Bitcoin equivalent of observer nodes:
//
// https://bitcoinj.github.io/working-with-the-wallet#watching-wallets
//
// The ability to have a wallet containing both irrelevant and relevant states complicated everything quite
// dramatically, even methods as basic as the getBalance() API which required additional modes to let you
// query "balance I can spend" vs "balance I am observing". In the end it might have been better to just
// require the user to create an entirely separate wallet for observing with.
//
// In Corda we don't support a single node having multiple vaults (at the time of writing), and it's not
// clear that's the right way to go: perhaps adding an "origin" column to the VAULT_STATES table is a better
// solution. Then you could select subsets of states depending on where the report came from.
//
// The risk of doing this is that apps/developers may use 'canned SQL queries' not written by us that forget
// to add a WHERE clause for the origin column. Those queries will seem to work most of the time until
// they're run on an observer node and mix in irrelevant data. In the worst case this may result in
// erroneous data being reported to the user, which could cause security problems.
//
// Because the primary use case for recording irrelevant states is observer/regulator nodes, who are unlikely
// to make writes to the ledger very often or at all, we choose to punt this issue for the time being.
vaultService.notifyAll(statesToRecord, toNotify)
}
}

View File

@ -1,5 +1,6 @@
package net.corda.node.services.api
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.VaultService
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.NotaryChangeWireTransaction
@ -12,8 +13,8 @@ interface VaultServiceInternal : VaultService {
* indicate whether an update consists entirely of regular or notary change transactions, which may require
* different processing logic.
*/
fun notifyAll(txns: Iterable<CoreTransaction>)
fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>)
/** Same as notifyAll but with a single transaction. */
fun notify(tx: CoreTransaction) = notifyAll(listOf(tx))
fun notify(statesToRecord: StatesToRecord, tx: CoreTransaction) = notifyAll(statesToRecord, listOf(tx))
}

View File

@ -1,26 +1,23 @@
package net.corda.node.services.config
import com.typesafe.config.Config
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.nodeapi.User
import net.corda.nodeapi.config.NodeSSLConfiguration
import net.corda.nodeapi.config.parseAs
import java.net.URL
import java.nio.file.Path
import java.util.*
data class DevModeOptions(val disableCheckpointChecker: Boolean = false)
interface NodeConfiguration : NodeSSLConfiguration {
// myLegalName should be only used in the initial network registration, we should use the name from the certificate instead of this.
// TODO: Remove this so we don't accidentally use this identity in the code?
val myLegalName: CordaX500Name
/**
* If null then configure the node to run as the netwok map service, otherwise use this to connect to the network map
* service.
*/
val networkMapService: NetworkMapInfo?
val noNetworkMapServiceMode: Boolean
val minimumPlatformVersion: Int
val emailAddress: String
val exportJMXto: String
@ -28,6 +25,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val database: Properties?
val rpcUsers: List<User>
val devMode: Boolean
val devModeOptions: DevModeOptions?
val certificateSigningService: URL
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
val verifierType: VerifierType
@ -35,6 +33,13 @@ interface NodeConfiguration : NodeSSLConfiguration {
val notary: NotaryConfig?
val activeMQServer: ActiveMqServerConfiguration
val additionalNodeInfoPollingFrequencyMsec: Long
val useHTTPS: Boolean
val p2pAddress: NetworkHostAndPort
val rpcAddress: NetworkHostAndPort?
val messagingServerAddress: NetworkHostAndPort?
// TODO Move into DevModeOptions
val useTestClock: Boolean get() = false
val detectPublicIp: Boolean get() = true
}
data class NotaryConfig(val validating: Boolean,
@ -68,7 +73,9 @@ data class BridgeConfiguration(val retryIntervalMs: Long,
data class ActiveMqServerConfiguration(val bridge: BridgeConfiguration)
data class FullNodeConfiguration(
fun Config.parseAsNodeConfiguration(): NodeConfiguration = this.parseAs<NodeConfigurationImpl>()
data class NodeConfigurationImpl(
/** This is not retrieved from the config file but rather from a command line argument. */
override val baseDirectory: Path,
override val myLegalName: CordaX500Name,
@ -78,25 +85,27 @@ data class FullNodeConfiguration(
override val dataSourceProperties: Properties,
override val database: Properties?,
override val certificateSigningService: URL,
override val networkMapService: NetworkMapInfo?,
override val noNetworkMapServiceMode: Boolean = false,
override val minimumPlatformVersion: Int = 1,
override val rpcUsers: List<User>,
override val verifierType: VerifierType,
// TODO typesafe config supports the notion of durations. Make use of that by mapping it to java.time.Duration.
// Then rename this to messageRedeliveryDelay and make it of type Duration
override val messageRedeliveryDelaySeconds: Int = 30,
val useHTTPS: Boolean,
val p2pAddress: NetworkHostAndPort,
val rpcAddress: NetworkHostAndPort?,
override val useHTTPS: Boolean,
override val p2pAddress: NetworkHostAndPort,
override val rpcAddress: NetworkHostAndPort?,
val relay: RelayConfiguration?,
// TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker.
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
val messagingServerAddress: NetworkHostAndPort?,
override val messagingServerAddress: NetworkHostAndPort?,
override val notary: NotaryConfig?,
override val certificateChainCheckPolicies: List<CertChainPolicyConfig>,
override val devMode: Boolean = false,
val useTestClock: Boolean = false,
val detectPublicIp: Boolean = true,
override val devModeOptions: DevModeOptions? = null,
override val useTestClock: Boolean = false,
override val detectPublicIp: Boolean = true,
override val activeMQServer: ActiveMqServerConfiguration,
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis()
) : NodeConfiguration {
override val exportJMXto: String get() = "http"
@ -104,10 +113,7 @@ data class FullNodeConfiguration(
init {
// This is a sanity feature do not remove.
require(!useTestClock || devMode) { "Cannot use test clock outside of dev mode" }
// TODO Move this to ArtemisMessagingServer
rpcUsers.forEach {
require(it.username.matches("\\w+".toRegex())) { "Username ${it.username} contains invalid characters" }
}
require(devModeOptions == null || devMode) { "Cannot use devModeOptions outside of dev mode" }
require(myLegalName.commonName == null) { "Common name must be null: $myLegalName" }
require(minimumPlatformVersion >= 1) { "minimumPlatformVersion cannot be less than 1" }
}

View File

@ -11,6 +11,7 @@ import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.utilities.NODE_DATABASE_PREFIX

View File

@ -5,6 +5,7 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.utilities.NODE_DATABASE_PREFIX
@ -50,7 +51,10 @@ class PersistentKeyManagementService(val identityService: IdentityService,
fun createKeyMap(): AppendOnlyPersistentMap<PublicKey, PrivateKey, PersistentKey, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toStringShort() },
fromPersistentEntity = { Pair(Crypto.decodePublicKey(it.publicKey), Crypto.decodePrivateKey(it.privateKey)) },
fromPersistentEntity = {
Pair(Crypto.decodePublicKey(it.publicKey),
Crypto.decodePrivateKey(it.privateKey))
},
toPersistentEntity = { key: PublicKey, value: PrivateKey ->
PersistentKey(key, value)
},

View File

@ -1,21 +1,20 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import io.netty.handler.ssl.SslHandler
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.AddressFormatException
import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.div
import net.corda.core.internal.noneOrSingle
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.utilities.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.parsePublicKeyBase58
import net.corda.node.internal.Node
import net.corda.node.services.RPCUserService
import net.corda.node.services.config.NodeConfiguration
@ -37,12 +36,10 @@ import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.message.impl.CoreMessage
import org.apache.activemq.artemis.core.remoting.impl.netty.*
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.spi.core.remoting.*
@ -111,14 +108,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer
val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
private val _networkMapConnectionFuture = config.networkMapService?.let { openFuture<Unit>() }
/**
* A [ListenableFuture] which completes when the server successfully connects to the network map node. If a
* non-recoverable error is encountered then the Future will complete with an exception.
*/
val networkMapConnectionFuture: CordaFuture<Unit>? get() = _networkMapConnectionFuture
private var networkChangeHandle: Subscription? = null
private val nodeRunsNetworkMapService = config.networkMapService == null
init {
config.baseDirectory.requireOnDefaultFileSystem()
@ -132,8 +122,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
fun start() = mutex.locked {
if (!running) {
configureAndStartServer()
// Deploy bridge to the network map service
config.networkMapService?.let { deployBridge(NetworkMapAddress(it.address), setOf(it.legalName)) }
networkChangeHandle = networkMapCache.changed.subscribe { updateBridgesOnNetworkChange(it) }
running = true
}
@ -158,7 +146,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
// Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem.
registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) }
if (nodeRunsNetworkMapService) registerPostQueueCreationCallback { handleIpDetectionRequest(it.toString()) }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
}
activeMQServer.start()
@ -247,12 +234,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
// TODO remove the NODE_USER role once the webserver doesn't need it
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$NODE_USER.#"] = setOf(nodeInternalRole)
if (nodeRunsNetworkMapService) {
securityRoles["$IP_REQUEST_PREFIX*"] = setOf(
nodeInternalRole,
restrictedRole(PEER_ROLE, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)
)
}
for ((username) in userService.users) {
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#"] = setOf(
nodeInternalRole,
@ -330,7 +311,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
log.debug { "Updating bridges on network map change: ${change.node}" }
fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> {
val address = node.addresses.first()
return node.legalIdentitiesAndCerts.map { getArtemisPeerAddress(it.party, address, config.networkMapService?.legalName) }.asSequence()
return node.legalIdentitiesAndCerts.map { NodeAddress(it.party.owningKey, address) }.asSequence()
}
fun deployBridges(node: NodeInfo) {
@ -409,47 +390,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
// This is called on one of Artemis' background threads
internal fun hostVerificationFail(expectedLegalNames: Set<CordaX500Name>, errorMsg: String?) {
log.error(errorMsg)
if (config.networkMapService?.legalName in expectedLegalNames) {
// If the peer that failed host verification was the network map node then we're in big trouble and need to bail!
_networkMapConnectionFuture!!.setException(IOException("${config.networkMapService} failed host verification check"))
}
}
// This is called on one of Artemis' background threads
internal fun onTcpConnection(peerLegalName: CordaX500Name) {
if (peerLegalName == config.networkMapService?.legalName) {
_networkMapConnectionFuture!!.set(Unit)
}
}
private fun handleIpDetectionRequest(queueName: String) {
fun getRemoteAddress(requestId: String): String? {
val session = activeMQServer.sessions.first {
it.getMetaData(ipDetectRequestProperty) == requestId
}
return session.remotingConnection.remoteAddress
}
fun sendResponse(remoteAddress: String?) {
val responseMessage = CoreMessage(random63BitValue(), 0).apply {
putStringProperty(ipDetectResponseProperty, remoteAddress)
}
val routingContext = RoutingContextImpl(null)
val queue = activeMQServer.locateQueue(SimpleString(queueName))
queue.route(responseMessage, routingContext)
activeMQServer.postOffice.processRoute(responseMessage, routingContext, true)
}
if (!queueName.startsWith(IP_REQUEST_PREFIX)) return
val requestId = queueName.substringAfter(IP_REQUEST_PREFIX)
val remoteAddress = getRemoteAddress(requestId)
log.debug { "Detected remote address $remoteAddress for request $requestId" }
sendResponse(remoteAddress)
}
}
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
@ -473,7 +413,10 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?) :
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
private val server = configuration[ArtemisMessagingServer::class.java.name] as ArtemisMessagingServer
companion object {
private val log = loggerFor<VerifyingNettyConnector>()
}
private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration)
override fun createConnection(): Connection? {
@ -504,10 +447,9 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
}
X509Utilities.validateCertificateChain(session.localCertificates.last() as java.security.cert.X509Certificate, *session.peerCertificates)
server.onTcpConnection(peerLegalName)
} catch (e: IllegalArgumentException) {
connection.close()
server.hostVerificationFail(expectedLegalNames, e.message)
log.error(e.message)
return null
}
}

View File

@ -48,7 +48,6 @@ import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
import javax.security.auth.x500.X500Principal
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
@ -76,7 +75,7 @@ import javax.security.auth.x500.X500Principal
class NodeMessagingClient(override val config: NodeConfiguration,
private val versionInfo: VersionInfo,
private val serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey?,
private val myIdentity: PublicKey,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: CordaPersistence,
private val networkMapRegistrationFuture: CordaFuture<Unit>,
@ -172,14 +171,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
/** An executor for sending messages */
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
/**
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
*/
override val myAddress: SingleMessageRecipient = if (myIdentity != null) {
NodeAddress.asSingleNode(myIdentity, advertisedAddress)
} else {
NetworkMapAddress(advertisedAddress)
}
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()
@ -634,9 +626,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// TODO Rethink PartyInfo idea and merging PeerAddress/ServiceAddress (the only difference is that Service address doesn't hold host and port)
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) {
is PartyInfo.SingleNode -> {
getArtemisPeerAddress(partyInfo.party, partyInfo.addresses.first(), config.networkMapService?.legalName)
}
is PartyInfo.SingleNode -> NodeAddress(partyInfo.party.owningKey, partyInfo.addresses.first())
is PartyInfo.DistributedNode -> ServiceAddress(partyInfo.party.owningKey)
}
}

View File

@ -1,18 +1,14 @@
package net.corda.node.services.network
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkMapCache.MapChange
@ -21,29 +17,18 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.schemas.NodeInfoSchemaV1
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.toBase58String
import net.corda.node.services.api.NetworkCacheException
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.*
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import org.hibernate.Session
import rx.Observable
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.HashMap
@ -78,7 +63,6 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
val logger = loggerFor<PersistentNetworkMapCache>()
}
private var registeredForPush = false
// TODO Small explanation, partyNodes and registeredNodes is left in memory as it was before, because it will be removed in
// next PR that gets rid of services. These maps are used only for queries by service.
protected val registeredNodes: MutableMap<PublicKey, NodeInfo> = Collections.synchronizedMap(HashMap())
@ -88,6 +72,8 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
// TODO revisit the logic under which nodeReady and loadDBSuccess are set.
// with the NetworkMapService redesign their meaning is not too well defined.
private val _registrationFuture = openFuture<Void?>()
override val nodeReady: CordaFuture<Void?> get() = _registrationFuture
private var _loadDBSuccess: Boolean = false
@ -152,38 +138,6 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
}
}
override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean,
ifChangedSinceVer: Int?): CordaFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC,
data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes)
network.send(ackMessage, req.replyTo)
processUpdatePush(req)
} catch (e: NodeMapException) {
logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
} catch (e: Exception) {
logger.error("Exception processing update from network map service", e)
}
}
registeredForPush = true
}
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, network.myAddress)
val future = network.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) ->
// We may not receive any nodes back, if the map hasn't changed since the version specified
nodes?.forEach { processRegistration(it) }
Unit
}
_registrationFuture.captureLater(future.map { null })
return future
}
override fun addNode(node: NodeInfo) {
logger.info("Adding node with info: $node")
synchronized(_changed) {
@ -210,6 +164,8 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
logger.info("Previous node was identical to incoming one - doing nothing")
}
}
_loadDBSuccess = true // This is used in AbstractNode to indicate that node is ready.
_registrationFuture.set(null)
logger.info("Done adding node with info: $node")
}
@ -225,49 +181,11 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
logger.info("Done removing node with info: $node")
}
/**
* Unsubscribes from updates from the given map service.
* @param mapParty the network map service party to listen to updates from.
*/
override fun deregisterForUpdates(network: MessagingService, mapParty: Party): CordaFuture<Unit> {
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.SubscribeRequest(false, network.myAddress)
// `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo.
val address = getPartyInfo(mapParty)?.let { network.getAddressOfParty(it) } ?:
throw IllegalArgumentException("Can't deregister for updates, don't know the party: $mapParty")
val future = network.sendRequest<SubscribeResponse>(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map {
if (it.confirmed) Unit else throw NetworkCacheException.DeregistrationFailed()
}
_registrationFuture.captureLater(future.map { null })
return future
}
fun processUpdatePush(req: NetworkMapService.Update) {
try {
val reg = req.wireReg.verified()
processRegistration(reg)
} catch (e: SignatureException) {
throw NodeMapException.InvalidSignature()
}
}
override val allNodes: List<NodeInfo>
get() = database.transaction {
getAllInfos(session).map { it.toNodeInfo() }
}
private fun processRegistration(reg: NodeRegistration) {
when (reg.type) {
AddOrRemove.ADD -> addNode(reg.node)
AddOrRemove.REMOVE -> removeNode(reg.node)
}
}
@VisibleForTesting
override fun runWithoutMapService() {
_registrationFuture.set(null)
}
// Changes related to NetworkMap redesign
// TODO It will be properly merged into network map cache after services removal.
@ -288,14 +206,10 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
logger.info("Loaded node info: $nodeInfo")
val node = nodeInfo.toNodeInfo()
addNode(node)
_loadDBSuccess = true // This is used in AbstractNode to indicate that node is ready.
} catch (e: Exception) {
logger.warn("Exception parsing network map from the database.", e)
}
}
if (loadDBSuccess) {
_registrationFuture.set(null) // Useful only if we don't have NetworkMapService configured so StateMachineManager can start.
}
}
private fun updateInfoDB(nodeInfo: NodeInfo) {

View File

@ -8,10 +8,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.utilities.UntrustworthyData
class FlowSessionImpl(
override val counterparty: Party
) : FlowSession() {
class FlowSessionImpl(override val counterparty: Party) : FlowSession() {
internal lateinit var stateMachine: FlowStateMachine<*>
internal lateinit var sessionFlow: FlowLogic<*>
@ -57,5 +54,7 @@ class FlowSessionImpl(
@Suspendable
override fun send(payload: Any) = send(payload, maySkipCheckpoint = false)
override fun toString() = "Flow session with $counterparty"
}

View File

@ -33,10 +33,7 @@ import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.messaging.TopicSession
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
import net.corda.nodeapi.internal.serialization.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch
@ -46,7 +43,6 @@ import rx.subjects.PublishSubject
import java.io.NotSerializableException
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS
import javax.annotation.concurrent.ThreadSafe
@ -92,7 +88,11 @@ class StateMachineManagerImpl(
private val scheduler = FiberScheduler()
private val mutex = ThreadBox(InnerState())
// This thread (only enabled in dev mode) deserialises checkpoints in the background to shake out bugs in checkpoint restore.
private val checkpointCheckerThread = if (serviceHub.configuration.devMode) Executors.newSingleThreadExecutor() else null
private val checkpointCheckerThread = if (serviceHub.configuration.devModeOptions?.disableCheckpointChecker != true) {
newNamedSingleThreadExecutor("CheckpointChecker")
} else {
null
}
@Volatile private var unrestorableCheckpoints = false

View File

@ -90,6 +90,7 @@ object BFTSMaRt {
private fun awaitClientConnectionToCluster() {
// TODO: Hopefully we only need to wait for the client's initial connection to the cluster, and this method can be moved to some startup code.
// TODO: Investigate ConcurrentModificationException in this method.
while (true) {
val inactive = sessionTable.entries.mapNotNull { if (it.value.channel.isActive) null else it.key }
if (inactive.isEmpty()) break

View File

@ -13,6 +13,7 @@ import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.node.services.vault.SortAttribute
import net.corda.core.messaging.DataFeed
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.VaultQueryException
import net.corda.core.node.services.vault.*
import net.corda.core.schemas.PersistentStateRef
@ -50,8 +51,7 @@ private fun CriteriaBuilder.executeUpdate(session: Session, configure: Root<*>.(
}
/**
* Currently, the node vault service is a very simple RDBMS backed implementation. It will change significantly when
* we add further functionality as the design for the vault and vault service matures.
* The vault service handles storage, retrieval and querying of states.
*
* This class needs database transactions to be in-flight during method calls and init, and will throw exceptions if
* this is not the case.
@ -59,8 +59,12 @@ private fun CriteriaBuilder.executeUpdate(session: Session, configure: Root<*>.(
* TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time.
* TODO: have transaction storage do some caching.
*/
class NodeVaultService(private val clock: Clock, private val keyManagementService: KeyManagementService, private val stateLoader: StateLoader, hibernateConfig: HibernateConfiguration) : SingletonSerializeAsToken(), VaultServiceInternal {
class NodeVaultService(
private val clock: Clock,
private val keyManagementService: KeyManagementService,
private val stateLoader: StateLoader,
hibernateConfig: HibernateConfiguration
) : SingletonSerializeAsToken(), VaultServiceInternal {
private companion object {
val log = loggerFor<NodeVaultService>()
}
@ -118,7 +122,10 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
override val updates: Observable<Vault.Update<ContractState>>
get() = mutex.locked { _updatesInDbTx }
override fun notifyAll(txns: Iterable<CoreTransaction>) {
override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>) {
if (statesToRecord == StatesToRecord.NONE)
return
// It'd be easier to just group by type, but then we'd lose ordering.
val regularTxns = mutableListOf<WireTransaction>()
val notaryChangeTxns = mutableListOf<NotaryChangeWireTransaction>()
@ -128,30 +135,33 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
is WireTransaction -> {
regularTxns.add(tx)
if (notaryChangeTxns.isNotEmpty()) {
notifyNotaryChange(notaryChangeTxns.toList())
notifyNotaryChange(notaryChangeTxns.toList(), statesToRecord)
notaryChangeTxns.clear()
}
}
is NotaryChangeWireTransaction -> {
notaryChangeTxns.add(tx)
if (regularTxns.isNotEmpty()) {
notifyRegular(regularTxns.toList())
notifyRegular(regularTxns.toList(), statesToRecord)
regularTxns.clear()
}
}
}
}
if (regularTxns.isNotEmpty()) notifyRegular(regularTxns.toList())
if (notaryChangeTxns.isNotEmpty()) notifyNotaryChange(notaryChangeTxns.toList())
if (regularTxns.isNotEmpty()) notifyRegular(regularTxns.toList(), statesToRecord)
if (notaryChangeTxns.isNotEmpty()) notifyNotaryChange(notaryChangeTxns.toList(), statesToRecord)
}
private fun notifyRegular(txns: Iterable<WireTransaction>) {
private fun notifyRegular(txns: Iterable<WireTransaction>, statesToRecord: StatesToRecord) {
fun makeUpdate(tx: WireTransaction): Vault.Update<ContractState> {
val myKeys = keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } })
val ourNewStates = tx.outputs.
filter { isRelevant(it.data, myKeys.toSet()) }.
map { tx.outRef<ContractState>(it.data) }
val ourNewStates = when (statesToRecord) {
StatesToRecord.NONE -> throw AssertionError("Should not reach here")
StatesToRecord.ONLY_RELEVANT -> tx.outputs.filter { isRelevant(it.data, myKeys.toSet()) }
StatesToRecord.ALL_VISIBLE -> tx.outputs
}.map { tx.outRef<ContractState>(it.data) }
// Retrieve all unconsumed states for this transaction's inputs
val consumedStates = loadStates(tx.inputs)
@ -169,7 +179,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
processAndNotify(netDelta)
}
private fun notifyNotaryChange(txns: Iterable<NotaryChangeWireTransaction>) {
private fun notifyNotaryChange(txns: Iterable<NotaryChangeWireTransaction>, statesToRecord: StatesToRecord) {
fun makeUpdate(tx: NotaryChangeWireTransaction): Vault.Update<ContractState> {
// We need to resolve the full transaction here because outputs are calculated from inputs
// We also can't do filtering beforehand, since output encumbrance pointers get recalculated based on
@ -178,7 +188,12 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
val myKeys = keyManagementService.filterMyKeys(ltx.outputs.flatMap { it.data.participants.map { it.owningKey } })
val (consumedStateAndRefs, producedStates) = ltx.inputs.
zip(ltx.outputs).
filter { (_, output) -> isRelevant(output.data, myKeys.toSet()) }.
filter { (_, output) ->
if (statesToRecord == StatesToRecord.ONLY_RELEVANT)
isRelevant(output.data, myKeys.toSet())
else
true
}.
unzip()
val producedStateAndRefs = producedStates.map { ltx.outRef<ContractState>(it.data) }

View File

@ -0,0 +1,29 @@
package net.corda.node.utilities
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger
/**
* Utility class that allows to give threads arbitrary name prefixes when they are created
* via an executor. It will use an underlying thread factory to create the actual thread
* and then override the thread name with the prefix and an ever increasing number
*/
class NamedThreadFactory(private val name:String, private val underlyingFactory: ThreadFactory) : ThreadFactory{
val threadNumber = AtomicInteger(1)
override fun newThread(runnable: Runnable?): Thread {
val thread = underlyingFactory.newThread(runnable)
thread.name = name + "-" + threadNumber.getAndIncrement()
return thread
}
}
/**
* Create a single thread executor with a NamedThreadFactory based on the default thread factory
* defined in java.util.concurrent.Executors
*/
fun newNamedSingleThreadExecutor(name: String): ExecutorService {
return Executors.newSingleThreadExecutor(NamedThreadFactory(name, Executors.defaultThreadFactory()))
}

View File

@ -33,8 +33,9 @@ import static net.corda.testing.TestConstants.*;
import static net.corda.testing.node.MockServices.*;
import static org.assertj.core.api.Assertions.*;
public class VaultQueryJavaTests extends TestDependencyInjectionBase {
public class VaultQueryJavaTests {
@Rule
public SerializationEnvironmentRule testSerialization = new SerializationEnvironmentRule();
private MockServices services;
private MockServices issuerServices;
private VaultService vaultService;

View File

@ -13,7 +13,6 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.internal.cordapp.DummyRPCFlow
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Before
@ -82,7 +81,7 @@ class CordaServiceTest {
@Before
fun start() {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.node.internal","net.corda.finance"))
notaryNode = mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name, validating = true)
notaryNode = mockNet.createNotaryNode()
nodeA = mockNet.createNode()
mockNet.startNodes()
}

View File

@ -4,7 +4,6 @@ import net.corda.node.services.messaging.Message
import net.corda.node.services.messaging.TopicStringValidator
import net.corda.node.services.messaging.createMessage
import net.corda.testing.node.MockNetwork
import net.corda.testing.resetTestSerialization
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -23,11 +22,7 @@ class InMemoryMessagingTests {
@After
fun tearDown() {
if (mockNet.nodes.isNotEmpty()) {
mockNet.stopNodes()
} else {
resetTestSerialization()
}
mockNet.stopNodes()
}
@Test

View File

@ -16,6 +16,7 @@ import net.corda.node.internal.StartedNode
import net.corda.testing.*
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNodeParameters
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
@ -37,10 +38,10 @@ class NotaryChangeTests {
@Before
fun setUp() {
mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
oldNotaryNode = mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name)
oldNotaryNode = mockNet.createNotaryNode(MockNodeParameters(legalName = DUMMY_NOTARY.name))
clientNodeA = mockNet.createNode()
clientNodeB = mockNet.createNode()
newNotaryNode = mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name.copy(organisation = "Dummy Notary 2"))
newNotaryNode = mockNet.createNotaryNode(MockNodeParameters(legalName = DUMMY_NOTARY.name.copy(organisation = "Dummy Notary 2")))
mockNet.runNetwork() // Clear network map registration messages
oldNotaryParty = newNotaryNode.services.networkMapCache.getNotary(DUMMY_NOTARY_SERVICE_NAME)!!
newNotaryParty = newNotaryNode.services.networkMapCache.getNotary(DUMMY_NOTARY_SERVICE_NAME.copy(organisation = "Dummy Notary 2"))!!

View File

@ -0,0 +1,20 @@
package net.corda.node.services
import net.corda.nodeapi.User
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
class RPCUserServiceTest {
@Test
fun `Artemis special characters not permitted in RPC usernames`() {
assertThatThrownBy { configWithRPCUsername("user.1") }.hasMessageContaining(".")
assertThatThrownBy { configWithRPCUsername("user*1") }.hasMessageContaining("*")
assertThatThrownBy { configWithRPCUsername("user#1") }.hasMessageContaining("#")
}
private fun configWithRPCUsername(username: String) {
RPCUserServiceImpl(listOf(User(username, "password", setOf())))
}
}

View File

@ -1,47 +0,0 @@
package net.corda.node.services.config
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.net.URL
import java.nio.file.Paths
class FullNodeConfigurationTest {
@Test
fun `Artemis special characters not permitted in RPC usernames`() {
val testConfiguration = FullNodeConfiguration(
baseDirectory = Paths.get("."),
myLegalName = ALICE.name,
networkMapService = null,
emailAddress = "",
keyStorePassword = "cordacadevpass",
trustStorePassword = "trustpass",
dataSourceProperties = makeTestDataSourceProperties(ALICE.name.organisation),
database = makeTestDatabaseProperties(),
certificateSigningService = URL("http://localhost"),
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
useHTTPS = false,
p2pAddress = NetworkHostAndPort("localhost", 0),
rpcAddress = NetworkHostAndPort("localhost", 1),
messagingServerAddress = null,
notary = null,
certificateChainCheckPolicies = emptyList(),
devMode = true,
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
additionalNodeInfoPollingFrequencyMsec = 5.seconds.toMillis(),
relay = null)
fun configWithRPCUsername(username: String) {
testConfiguration.copy(rpcUsers = listOf(User(username, "pass", emptySet())))
}
assertThatThrownBy { configWithRPCUsername("user.1") }.hasMessageContaining(".")
assertThatThrownBy { configWithRPCUsername("user*1") }.hasMessageContaining("*")
assertThatThrownBy { configWithRPCUsername("user#1") }.hasMessageContaining("#")
}
}

View File

@ -0,0 +1,55 @@
package net.corda.node.services.config
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.ALICE
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.net.URL
import java.nio.file.Paths
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class NodeConfigurationImplTest {
@Test
fun `can't have dev mode options if not in dev mode`() {
val debugOptions = DevModeOptions()
configDebugOptions(true, debugOptions)
configDebugOptions(true, null)
assertThatThrownBy { configDebugOptions(false, debugOptions) }.hasMessageMatching("Cannot use devModeOptions outside of dev mode")
configDebugOptions(false, null)
}
@Test
fun `check devModeOptions flag helper`() {
assertFalse { configDebugOptions(true, null).devModeOptions?.disableCheckpointChecker == true }
assertFalse { configDebugOptions(true, DevModeOptions()).devModeOptions?.disableCheckpointChecker == true }
assertFalse { configDebugOptions(true, DevModeOptions(false)).devModeOptions?.disableCheckpointChecker == true }
assertTrue { configDebugOptions(true, DevModeOptions(true)).devModeOptions?.disableCheckpointChecker == true }
}
private fun configDebugOptions(devMode: Boolean, devModeOptions: DevModeOptions?) : NodeConfiguration {
return testConfiguration.copy(devMode = devMode, devModeOptions = devModeOptions)
}
private val testConfiguration = NodeConfigurationImpl(
baseDirectory = Paths.get("."),
myLegalName = ALICE.name,
emailAddress = "",
keyStorePassword = "cordacadevpass",
trustStorePassword = "trustpass",
dataSourceProperties = makeTestDataSourceProperties(ALICE.name.organisation),
database = makeTestDatabaseProperties(),
certificateSigningService = URL("http://localhost"),
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
useHTTPS = false,
p2pAddress = NetworkHostAndPort("localhost", 0),
rpcAddress = NetworkHostAndPort("localhost", 1),
messagingServerAddress = null,
notary = null,
certificateChainCheckPolicies = emptyList(),
devMode = true,
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)))
}

View File

@ -12,6 +12,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
@ -41,6 +42,7 @@ import net.corda.testing.node.MockServices.Companion.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.nio.file.Paths
import java.security.PublicKey
@ -56,6 +58,9 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
private val myInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), listOf(DUMMY_IDENTITY_1), 1, serial = 1L)
}
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
private val realClock: Clock = Clock.systemUTC()
private val stoppedClock: Clock = Clock.fixed(realClock.instant(), realClock.zone)
private val testClock = TestClock(stoppedClock)
@ -85,7 +90,6 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
@Before
fun setup() {
initialiseTestSerialization()
countDown = CountDownLatch(1)
smmHasRemovedAllFlows = CountDownLatch(1)
calls = 0
@ -107,11 +111,12 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
doReturn(myInfo).whenever(it).myInfo
doReturn(kms).whenever(it).keyManagementService
doReturn(CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.testing.contracts")), MockAttachmentStorage())).whenever(it).cordappProvider
doCallRealMethod().whenever(it).recordTransactions(any<SignedTransaction>())
doCallRealMethod().whenever(it).recordTransactions(any<Boolean>(), any<SignedTransaction>())
doCallRealMethod().whenever(it).recordTransactions(any(), any<Iterable<SignedTransaction>>())
doCallRealMethod().whenever(it).recordTransactions(any<StatesToRecord>(), any())
doCallRealMethod().whenever(it).recordTransactions(any<Iterable<SignedTransaction>>())
doCallRealMethod().whenever(it).recordTransactions(any<SignedTransaction>(), anyVararg())
doReturn(NodeVaultService(testClock, kms, stateLoader, database.hibernateConfig)).whenever(it).vaultService
doReturn(this@NodeSchedulerServiceTest).whenever(it).testReference
}
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
mockSMM = StateMachineManagerImpl(services, DBCheckpointStorage(), smmExecutor, database)
@ -135,11 +140,11 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
smmExecutor.shutdown()
smmExecutor.awaitTermination(60, TimeUnit.SECONDS)
database.close()
resetTestSerialization()
}
// Ignore IntelliJ when it says these properties can be private, if they are we cannot serialise them
// in AMQP.
@Suppress("MemberVisibilityCanPrivate")
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant, val myIdentity: Party) : LinearState, SchedulableState {
override val participants: List<AbstractParty>
get() = listOf(myIdentity)

View File

@ -16,7 +16,6 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.StartedNode
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.chooseIdentity
import net.corda.testing.contracts.DummyContract
import net.corda.testing.dummyCommand
@ -95,7 +94,7 @@ class ScheduledFlowTests {
@Before
fun setup() {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.testing.contracts"))
notaryNode = mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name)
notaryNode = mockNet.createNotaryNode()
val a = mockNet.createUnstartedNode()
val b = mockNet.createUnstartedNode()

View File

@ -39,7 +39,10 @@ import kotlin.test.assertEquals
import kotlin.test.assertNull
//TODO This needs to be merged into P2PMessagingTest as that creates a more realistic environment
class ArtemisMessagingTests : TestDependencyInjectionBase() {
class ArtemisMessagingTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()

View File

@ -10,6 +10,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.utilities.CertificateType
import net.corda.node.utilities.X509Utilities
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.TestDependencyInjectionBase
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.asn1.x500.X500Name
@ -23,6 +24,7 @@ import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.servlet.ServletContainer
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.io.ByteArrayInputStream
import java.io.InputStream
@ -37,7 +39,10 @@ import javax.ws.rs.core.Response
import javax.ws.rs.core.Response.ok
import kotlin.test.assertEquals
class HTTPNetworkMapClientTest : TestDependencyInjectionBase() {
class HTTPNetworkMapClientTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
private lateinit var server: Server
private lateinit var networkMapClient: NetworkMapClient

View File

@ -8,13 +8,14 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.LogHelper
import net.corda.testing.TestDependencyInjectionBase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
import net.corda.testing.node.MockServices.Companion.makeTestIdentityService
import net.corda.testing.SerializationEnvironmentRule
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
internal fun CheckpointStorage.checkpoints(): List<Checkpoint> {
@ -26,7 +27,10 @@ internal fun CheckpointStorage.checkpoints(): List<Checkpoint> {
return checkpoints
}
class DBCheckpointStorageTests : TestDependencyInjectionBase() {
class DBCheckpointStorageTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
lateinit var checkpointStorage: DBCheckpointStorage
lateinit var database: CordaPersistence

View File

@ -5,13 +5,13 @@ import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.VaultService
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.CordaPersistence
@ -24,11 +24,15 @@ import net.corda.testing.node.MockServices.Companion.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class DBTransactionStorageTests : TestDependencyInjectionBase() {
class DBTransactionStorageTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
lateinit var database: CordaPersistence
lateinit var transactionStorage: DBTransactionStorage
lateinit var services: MockServices
@ -40,7 +44,6 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(), ::makeTestIdentityService)
database.transaction {
services = object : MockServices(BOB_KEY) {
override val vaultService: VaultServiceInternal
get() {
@ -54,7 +57,7 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
vaultService.notifyAll(StatesToRecord.ONLY_RELEVANT, txs.map { it.tx })
}
}
}

View File

@ -5,14 +5,15 @@ import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.SecureHash
import net.corda.core.node.StatesToRecord
import net.corda.core.utilities.toBase58String
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
import net.corda.core.schemas.CommonSchemaV1
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.transactions.SignedTransaction
import net.corda.finance.DOLLARS
import net.corda.finance.POUNDS
@ -43,10 +44,7 @@ import net.corda.testing.schemas.DummyLinearStateSchemaV2
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
import org.hibernate.SessionFactory
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import org.junit.*
import java.math.BigDecimal
import java.time.Instant
import java.util.*
@ -54,8 +52,10 @@ import javax.persistence.EntityManager
import javax.persistence.Tuple
import javax.persistence.criteria.CriteriaBuilder
class HibernateConfigurationTest : TestDependencyInjectionBase() {
class HibernateConfigurationTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
lateinit var services: MockServices
lateinit var issuerServices: MockServices
lateinit var database: CordaPersistence
@ -82,12 +82,12 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
hibernateConfig = database.hibernateConfig
services = object : MockServices(cordappPackages, BOB_KEY, BOC_KEY, DUMMY_NOTARY_KEY) {
override val vaultService = makeVaultService(database.hibernateConfig)
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
vaultService.notifyAll(statesToRecord, txs.map { it.tx })
}
override fun jdbcSession() = database.createSession()

View File

@ -146,16 +146,6 @@ class FlowFrameworkTests {
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
}
@Test
fun `flow added before network map does run after init`() {
val charlieNode = mockNet.createNode() //create vanilla node
val flow = NoOpFlow()
charlieNode.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
mockNet.runNetwork() // Allow network map messages to flow
assertEquals(true, flow.flowStarted) // Now we should have run the flow
}
@Test
fun `flow loaded from checkpoint will respond to messages from before start`() {
aliceNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }

View File

@ -18,14 +18,18 @@ import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
import net.corda.testing.node.MockServices.Companion.makeTestIdentityService
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.CompletableFuture
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class DistributedImmutableMapTests : TestDependencyInjectionBase() {
class DistributedImmutableMapTests {
data class Member(val client: CopycatClient, val server: CopycatServer)
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
lateinit var cluster: List<Member>
lateinit var transaction: DatabaseTransaction
private val databases: MutableList<CordaPersistence> = mutableListOf()

View File

@ -37,7 +37,7 @@ class NotaryServiceTests {
@Before
fun setup() {
mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
val notaryNode = mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name, validating = false)
val notaryNode = mockNet.createNotaryNode(validating = false)
aliceServices = mockNet.createNode(MockNodeParameters(legalName = ALICE_NAME)).services
mockNet.runNetwork() // Clear network map registration messages
notaryServices = notaryNode.services

View File

@ -10,11 +10,15 @@ import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
import net.corda.testing.node.MockServices.Companion.makeTestIdentityService
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class PersistentUniquenessProviderTests : TestDependencyInjectionBase() {
class PersistentUniquenessProviderTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
val identity = MEGA_CORP
val txID = SecureHash.randomSHA256()

View File

@ -37,7 +37,7 @@ class ValidatingNotaryServiceTests {
@Before
fun setup() {
mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
val notaryNode = mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name)
val notaryNode = mockNet.createNotaryNode()
val aliceNode = mockNet.createNode(MockNodeParameters(legalName = ALICE_NAME))
mockNet.runNetwork() // Clear network map registration messages
notaryServices = notaryNode.services

View File

@ -1,16 +1,15 @@
package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Amount
import net.corda.core.contracts.Issued
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.*
import net.corda.core.crypto.generateKeyPair
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.*
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.QueryCriteria.*
import net.corda.core.transactions.NotaryChangeWireTransaction
@ -30,11 +29,11 @@ import net.corda.node.utilities.CordaPersistence
import net.corda.testing.*
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import rx.observers.TestSubscriber
import java.math.BigDecimal
@ -45,11 +44,14 @@ import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class NodeVaultServiceTest : TestDependencyInjectionBase() {
class NodeVaultServiceTest {
companion object {
private val cordappPackages = listOf("net.corda.finance.contracts.asset", CashSchemaV1::class.packageName)
}
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
lateinit var services: MockServices
private lateinit var issuerServices: MockServices
val vaultService get() = services.vaultService as NodeVaultService
@ -58,8 +60,10 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
@Before
fun setUp() {
LogHelper.setLevel(NodeVaultService::class)
val databaseAndServices = makeTestDatabaseAndMockServices(keys = listOf(BOC_KEY, DUMMY_CASH_ISSUER_KEY),
cordappPackages = cordappPackages)
val databaseAndServices = MockServices.makeTestDatabaseAndMockServices(
keys = listOf(BOC_KEY, DUMMY_CASH_ISSUER_KEY),
cordappPackages = cordappPackages
)
database = databaseAndServices.first
services = databaseAndServices.second
issuerServices = MockServices(cordappPackages, DUMMY_CASH_ISSUER_KEY, BOC_KEY)
@ -102,10 +106,10 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val originalVault = vaultService
val services2 = object : MockServices() {
override val vaultService: NodeVaultService get() = originalVault
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
vaultService.notify(stx.tx)
vaultService.notify(statesToRecord, stx.tx)
}
}
}
@ -512,14 +516,14 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
}.toWireTransaction(services)
val cashState = StateAndRef(issueTx.outputs.single(), StateRef(issueTx.id, 0))
database.transaction { service.notify(issueTx) }
database.transaction { service.notify(StatesToRecord.ONLY_RELEVANT, issueTx) }
val expectedIssueUpdate = Vault.Update(emptySet(), setOf(cashState), null)
database.transaction {
val moveTx = TransactionBuilder(services.myInfo.chooseIdentity()).apply {
Cash.generateSpend(services, this, Amount(1000, GBP), thirdPartyIdentity)
}.toWireTransaction(services)
service.notify(moveTx)
service.notify(StatesToRecord.ONLY_RELEVANT, moveTx)
}
val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null)
@ -556,7 +560,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val cashStateWithNewNotary = StateAndRef(initialCashState.state.copy(notary = newNotary), StateRef(changeNotaryTx.id, 0))
database.transaction {
service.notifyAll(listOf(issueStx.tx, changeNotaryTx))
service.notifyAll(StatesToRecord.ONLY_RELEVANT, listOf(issueStx.tx, changeNotaryTx))
}
// Move cash
@ -567,7 +571,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
}
database.transaction {
service.notify(moveTx)
service.notify(StatesToRecord.ONLY_RELEVANT, moveTx)
}
val expectedIssueUpdate = Vault.Update(emptySet(), setOf(initialCashState), null)
@ -577,4 +581,32 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val observedUpdates = vaultSubscriber.onNextEvents
assertEquals(observedUpdates, listOf(expectedIssueUpdate, expectedNotaryChangeUpdate, expectedMoveUpdate))
}
@Test
fun observerMode() {
fun countCash(): Long {
return database.transaction {
vaultService.queryBy(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(), PageSpecification(1)).totalStatesAvailable
}
}
val currentCashStates = countCash()
// Send some minimalist dummy transaction.
val txb = TransactionBuilder(DUMMY_NOTARY)
txb.addOutputState(Cash.State(MEGA_CORP.ref(0), 100.DOLLARS, MINI_CORP), Cash::class.java.name)
txb.addCommand(Cash.Commands.Move(), MEGA_CORP_PUBKEY)
val wtx = txb.toWireTransaction(services)
database.transaction {
vaultService.notify(StatesToRecord.ONLY_RELEVANT, wtx)
}
// Check that it was ignored as irrelevant.
assertEquals(currentCashStates, countCash())
// Now try again and check it was accepted.
database.transaction {
vaultService.notify(StatesToRecord.ALL_VISIBLE, wtx)
}
assertEquals(currentCashStates + 1, countCash())
}
}

View File

@ -46,7 +46,10 @@ import java.time.ZoneOffset
import java.time.temporal.ChronoUnit
import java.util.*
class VaultQueryTests : TestDependencyInjectionBase() {
class VaultQueryTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
private val cordappPackages = setOf(
"net.corda.testing.contracts", "net.corda.finance.contracts",
CashSchemaV1::class.packageName, CommercialPaperSchemaV1::class.packageName, DummyLinearStateSchemaV1::class.packageName).toMutableList()

View File

@ -26,6 +26,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.util.*
import java.util.concurrent.CountDownLatch
@ -34,11 +35,14 @@ import kotlin.test.assertEquals
// TODO: Move this to the cash contract tests once mock services are further split up.
class VaultWithCashTest : TestDependencyInjectionBase() {
class VaultWithCashTest {
companion object {
private val cordappPackages = listOf("net.corda.testing.contracts", "net.corda.finance.contracts.asset", CashSchemaV1::class.packageName)
}
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
lateinit var services: MockServices
lateinit var issuerServices: MockServices
val vaultService: VaultService get() = services.vaultService