mirror of
https://github.com/corda/corda.git
synced 2025-06-16 22:28:15 +00:00
Merge branch 'master' into shams-merge-master-041217
# Conflicts: # node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt # samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/BankOfCordaCordform.kt # testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt # testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt # testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt # testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt # testing/node-driver/src/main/kotlin/net/corda/testing/node/NotarySpec.kt # verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt
This commit is contained in:
@ -8,7 +8,7 @@ import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
|
||||
import net.corda.testing.driver.driver
|
||||
|
@ -8,7 +8,7 @@ import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.testing.chooseIdentity
|
||||
|
@ -13,7 +13,7 @@ import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.finance.flows.CashPaymentFlow
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.driver
|
||||
|
@ -9,7 +9,7 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.driver.driver
|
||||
import org.bouncycastle.util.io.Streams
|
||||
|
@ -13,7 +13,7 @@ import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.finance.flows.CashPaymentFlow
|
||||
import net.corda.node.services.Permissions.Companion.invokeRpc
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.driver
|
||||
|
@ -49,7 +49,7 @@ class NodeInfoWatcherTest {
|
||||
fun start() {
|
||||
val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT)
|
||||
keyManagementService = MockKeyManagementService(identityService, ALICE_KEY)
|
||||
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler = scheduler)
|
||||
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler)
|
||||
nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
|
||||
}
|
||||
|
||||
|
@ -4,15 +4,15 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.internal.InputStreamAndHash
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.aliceAndBob
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.contracts.DummyState
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.dummyCommand
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
@ -65,15 +65,16 @@ class LargeTransactionsTest {
|
||||
val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 2)
|
||||
val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 3)
|
||||
driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.testing.contracts")) {
|
||||
val (alice, _) = aliceAndBob()
|
||||
alice.useRPC {
|
||||
val hash1 = it.uploadAttachment(bigFile1.inputStream)
|
||||
val hash2 = it.uploadAttachment(bigFile2.inputStream)
|
||||
val hash3 = it.uploadAttachment(bigFile3.inputStream)
|
||||
val hash4 = it.uploadAttachment(bigFile4.inputStream)
|
||||
val rpcUser = User("admin", "admin", setOf("ALL"))
|
||||
val (alice, _) = listOf(ALICE_NAME, BOB_NAME).map { startNode(providedName = it, rpcUsers = listOf(rpcUser)) }.transpose().getOrThrow()
|
||||
alice.rpcClientToNode().use(rpcUser.username, rpcUser.password) {
|
||||
val hash1 = it.proxy.uploadAttachment(bigFile1.inputStream)
|
||||
val hash2 = it.proxy.uploadAttachment(bigFile2.inputStream)
|
||||
val hash3 = it.proxy.uploadAttachment(bigFile3.inputStream)
|
||||
val hash4 = it.proxy.uploadAttachment(bigFile4.inputStream)
|
||||
assertEquals(hash1, bigFile1.sha256)
|
||||
// Should not throw any exceptions.
|
||||
it.startFlow(::SendLargeTransactionFlow, hash1, hash2, hash3, hash4).returnValue.get()
|
||||
it.proxy.startFlow(::SendLargeTransactionFlow, hash1, hash2, hash3, hash4).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ import net.corda.core.internal.*
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.MINI_CORP
|
||||
|
@ -1,6 +1,6 @@
|
||||
package net.corda.services.messaging
|
||||
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.messaging.SimpleMQClient
|
||||
import org.junit.Test
|
||||
|
||||
|
@ -22,8 +22,8 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATI
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.internal.NodeBasedTest
|
||||
import net.corda.testing.messaging.SimpleMQClient
|
||||
|
@ -19,7 +19,7 @@ import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.Permissions.Companion.invokeRpc
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.chooseIdentity
|
||||
import net.corda.testing.driver.driver
|
||||
import org.junit.Assume.assumeFalse
|
||||
|
@ -66,6 +66,7 @@ import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
|
||||
import org.slf4j.Logger
|
||||
import rx.Observable
|
||||
import rx.Scheduler
|
||||
import java.io.IOException
|
||||
import java.io.NotSerializableException
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
@ -232,7 +233,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
val networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
|
||||
NodeInfoWatcher(configuration.baseDirectory, Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
networkMapClient)
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
@ -258,6 +259,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Should be [rx.schedulers.Schedulers.io] for production,
|
||||
* or [rx.internal.schedulers.CachedThreadScheduler] (with shutdown registered with [runOnStop]) for shared-JVM testing.
|
||||
*/
|
||||
protected abstract fun getRxIoScheduler(): Scheduler
|
||||
|
||||
open fun startShell(rpcOps: CordaRPCOps) {
|
||||
InteractiveShell.startShell(configuration, rpcOps, userService, _services.identityService, _services.database)
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.schedulers.Schedulers
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.management.ObjectName
|
||||
@ -46,7 +47,7 @@ import kotlin.system.exitProcess
|
||||
*/
|
||||
open class Node(configuration: NodeConfiguration,
|
||||
versionInfo: VersionInfo,
|
||||
val initialiseSerialization: Boolean = true,
|
||||
private val initialiseSerialization: Boolean = true,
|
||||
cordappLoader: CordappLoader = makeCordappLoader(configuration)
|
||||
) : AbstractNode(configuration, createClock(configuration), versionInfo, cordappLoader) {
|
||||
companion object {
|
||||
@ -299,6 +300,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
return started
|
||||
}
|
||||
|
||||
override fun getRxIoScheduler() = Schedulers.io()!!
|
||||
private fun initialiseSerialization() {
|
||||
val classloader = cordappLoader.appClassLoader
|
||||
nodeSerializationEnv = SerializationEnvironmentImpl(
|
||||
|
@ -1,7 +1,7 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
|
||||
/**
|
||||
* Service for retrieving [User] objects representing RPC users who are authorised to use the RPC system. A [User]
|
||||
|
@ -8,7 +8,7 @@ import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SignatureScheme
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import org.bouncycastle.asn1.x509.GeneralName
|
||||
import org.bouncycastle.asn1.x509.GeneralSubtree
|
||||
|
@ -6,9 +6,9 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.messaging.CertificateChainCheckPolicy
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.config.parseAs
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.parseAs
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
|
@ -6,7 +6,7 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
|
||||
|
@ -33,6 +33,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREF
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration
|
||||
@ -43,7 +44,9 @@ import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.*
|
||||
import org.apache.activemq.artemis.core.security.Role
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||
import org.apache.activemq.artemis.spi.core.remoting.*
|
||||
@ -139,8 +142,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
// Artemis IO errors
|
||||
@Throws(IOException::class, KeyStoreException::class)
|
||||
private fun configureAndStartServer() {
|
||||
val artemisConfig = createArtemisConfig()
|
||||
val securityManager = createArtemisSecurityManager()
|
||||
val (artemisConfig, securityPlugin) = createArtemisConfig()
|
||||
val securityManager = createArtemisSecurityManager(securityPlugin)
|
||||
activeMQServer = ActiveMQServerImpl(artemisConfig, securityManager).apply {
|
||||
// Throw any exceptions which are detected during startup
|
||||
registerActivationFailureListener { exception -> throw exception }
|
||||
@ -156,7 +159,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply {
|
||||
private fun createArtemisConfig() = ConfigurationImpl().apply {
|
||||
val artemisDir = config.baseDirectory / "artemis"
|
||||
bindingsDirectory = (artemisDir / "bindings").toString()
|
||||
journalDirectory = (artemisDir / "journal").toString()
|
||||
@ -208,8 +211,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
|
||||
}
|
||||
)
|
||||
configureAddressSecurity()
|
||||
}
|
||||
}.configureAddressSecurity()
|
||||
|
||||
private fun queueConfig(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration {
|
||||
return CoreQueueConfiguration().apply {
|
||||
@ -227,7 +229,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
* 3. RPC users. These are only given sufficient access to perform RPC with us.
|
||||
* 4. Verifiers. These are given read access to the verification request queue and write access to the response queue.
|
||||
*/
|
||||
private fun ConfigurationImpl.configureAddressSecurity() {
|
||||
private fun ConfigurationImpl.configureAddressSecurity() : Pair<Configuration, LoginListener> {
|
||||
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
|
||||
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
|
||||
securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
|
||||
@ -236,13 +238,22 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$NODE_USER.#"] = setOf(nodeInternalRole)
|
||||
// Each RPC user must have its own role and its own queue. This prevents users accessing each other's queues
|
||||
// and stealing RPC responses.
|
||||
for ((username) in userService.users) {
|
||||
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#"] = setOf(
|
||||
nodeInternalRole,
|
||||
restrictedRole("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true))
|
||||
val rolesAdderOnLogin = RolesAdderOnLogin { username ->
|
||||
Pair(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#",
|
||||
setOf(
|
||||
nodeInternalRole,
|
||||
restrictedRole(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username",
|
||||
consume = true,
|
||||
createNonDurableQueue = true,
|
||||
deleteNonDurableQueue = true)))
|
||||
}
|
||||
securitySettingPlugins.add(rolesAdderOnLogin)
|
||||
securityRoles[VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, consume = true))
|
||||
securityRoles["${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.#"] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, send = true))
|
||||
val onLoginListener = { username: String -> rolesAdderOnLogin.onLogin(username) }
|
||||
return Pair(this, onLoginListener)
|
||||
}
|
||||
|
||||
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
|
||||
@ -253,7 +264,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
@Throws(IOException::class, KeyStoreException::class)
|
||||
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
|
||||
private fun createArtemisSecurityManager(loginListener: LoginListener): ActiveMQJAASSecurityManager {
|
||||
val keyStore = loadKeyStore(config.sslKeystore, config.keyStorePassword)
|
||||
val trustStore = loadKeyStore(config.trustStoreFile, config.trustStorePassword)
|
||||
|
||||
@ -270,6 +281,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
// Override to make it work with our login module
|
||||
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
|
||||
val options = mapOf(
|
||||
LoginListener::javaClass.name to loginListener,
|
||||
RPCUserService::class.java.name to userService,
|
||||
NodeLoginModule.CERT_CHAIN_CHECKS_OPTION_NAME to certChecks)
|
||||
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
|
||||
@ -546,6 +558,7 @@ class NodeLoginModule : LoginModule {
|
||||
private lateinit var subject: Subject
|
||||
private lateinit var callbackHandler: CallbackHandler
|
||||
private lateinit var userService: RPCUserService
|
||||
private lateinit var loginListener: LoginListener
|
||||
private lateinit var peerCertCheck: CertificateChainCheckPolicy.Check
|
||||
private lateinit var nodeCertCheck: CertificateChainCheckPolicy.Check
|
||||
private lateinit var verifierCertCheck: CertificateChainCheckPolicy.Check
|
||||
@ -555,6 +568,7 @@ class NodeLoginModule : LoginModule {
|
||||
this.subject = subject
|
||||
this.callbackHandler = callbackHandler
|
||||
userService = options[RPCUserService::class.java.name] as RPCUserService
|
||||
loginListener = options[LoginListener::javaClass.name] as LoginListener
|
||||
val certChainChecks: Map<String, CertificateChainCheckPolicy.Check> = uncheckedCast(options[CERT_CHAIN_CHECKS_OPTION_NAME])
|
||||
peerCertCheck = certChainChecks[PEER_ROLE]!!
|
||||
nodeCertCheck = certChainChecks[NODE_ROLE]!!
|
||||
@ -622,6 +636,7 @@ class NodeLoginModule : LoginModule {
|
||||
// TODO Retrieve client IP address to include in exception message
|
||||
throw FailedLoginException("Password for user $username does not match")
|
||||
}
|
||||
loginListener(username)
|
||||
principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests
|
||||
principals += RolePrincipal("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username") // This enables the RPC client to receive responses
|
||||
return username
|
||||
@ -676,3 +691,40 @@ class NodeLoginModule : LoginModule {
|
||||
loginSucceeded = false
|
||||
}
|
||||
}
|
||||
|
||||
typealias LoginListener = (String) -> Unit
|
||||
typealias RolesRepository = HierarchicalRepository<MutableSet<Role>>
|
||||
|
||||
/**
|
||||
* Helper class to dynamically assign security roles to RPC users
|
||||
* on their authentication. This object is plugged into the server
|
||||
* as [SecuritySettingPlugin]. It responds to authentication events
|
||||
* from [NodeLoginModule] by adding the address -> roles association
|
||||
* generated by the given [source], unless already done before.
|
||||
*/
|
||||
private class RolesAdderOnLogin(val source: (String) -> Pair<String, Set<Role>>)
|
||||
: SecuritySettingPlugin {
|
||||
|
||||
// Artemis internal container storing roles association
|
||||
private lateinit var repository: RolesRepository
|
||||
|
||||
fun onLogin(username: String) {
|
||||
val (address, roles) = source(username)
|
||||
val entry = repository.getMatch(address)
|
||||
if (entry == null || entry.isEmpty()) {
|
||||
repository.addMatch(address, roles.toMutableSet())
|
||||
}
|
||||
}
|
||||
|
||||
// Initializer called by the Artemis framework
|
||||
override fun setSecurityRepository(repository: RolesRepository) {
|
||||
this.repository = repository
|
||||
}
|
||||
|
||||
// Part of SecuritySettingPlugin interface which is no-op in this case
|
||||
override fun stop() = this
|
||||
|
||||
override fun init(options: MutableMap<String, String>?) = this
|
||||
|
||||
override fun getSecurityRoles() = null
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.getX509Certificate
|
||||
|
@ -30,6 +30,7 @@ import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.logging.pushToLoggingContext
|
||||
import net.corda.nodeapi.*
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
|
@ -11,7 +11,7 @@ import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
|
@ -12,7 +12,6 @@ import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.NodeInfoFilesCopier
|
||||
import rx.Observable
|
||||
import rx.Scheduler
|
||||
import rx.schedulers.Schedulers
|
||||
import java.io.IOException
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
@ -31,9 +30,8 @@ import kotlin.streams.toList
|
||||
*/
|
||||
// TODO: Use NIO watch service instead?
|
||||
class NodeInfoWatcher(private val nodePath: Path,
|
||||
private val pollInterval: Duration = 5.seconds,
|
||||
private val scheduler: Scheduler = Schedulers.io()) {
|
||||
|
||||
private val scheduler: Scheduler,
|
||||
private val pollInterval: Duration = 5.seconds) {
|
||||
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
|
||||
private val processedNodeInfoFiles = mutableSetOf<Path>()
|
||||
private val _processedNodeInfoHashes = mutableSetOf<SecureHash>()
|
||||
|
@ -29,8 +29,8 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.config.RaftConfig
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import java.nio.file.Path
|
||||
|
@ -653,13 +653,13 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
// wants to sell to Bob.
|
||||
val eb1 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
|
||||
// Issued money to itself.
|
||||
output(Cash.PROGRAM_ID, "elbonian money 1", notary = notary) { 800.DOLLARS.CASH issuedBy issuer ownedBy interimOwner }
|
||||
output(Cash.PROGRAM_ID, "elbonian money 2", notary = notary) { 1000.DOLLARS.CASH issuedBy issuer ownedBy interimOwner }
|
||||
output(Cash.PROGRAM_ID, "elbonian money 1", notary = notary, contractState = 800.DOLLARS.CASH issuedBy issuer ownedBy interimOwner)
|
||||
output(Cash.PROGRAM_ID, "elbonian money 2", notary = notary, contractState = 1000.DOLLARS.CASH issuedBy issuer ownedBy interimOwner)
|
||||
if (!withError) {
|
||||
command(issuer.party.owningKey) { Cash.Commands.Issue() }
|
||||
command(issuer.party.owningKey, Cash.Commands.Issue())
|
||||
} else {
|
||||
// Put a broken command on so at least a signature is created
|
||||
command(issuer.party.owningKey) { Cash.Commands.Move() }
|
||||
command(issuer.party.owningKey, Cash.Commands.Move())
|
||||
}
|
||||
timeWindow(TEST_TX_TIME)
|
||||
if (withError) {
|
||||
@ -672,16 +672,16 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
// Bob gets some cash onto the ledger from BoE
|
||||
val bc1 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
|
||||
input("elbonian money 1")
|
||||
output(Cash.PROGRAM_ID, "bob cash 1", notary = notary) { 800.DOLLARS.CASH issuedBy issuer ownedBy owner }
|
||||
command(interimOwner.owningKey) { Cash.Commands.Move() }
|
||||
output(Cash.PROGRAM_ID, "bob cash 1", notary = notary, contractState = 800.DOLLARS.CASH issuedBy issuer ownedBy owner)
|
||||
command(interimOwner.owningKey, Cash.Commands.Move())
|
||||
this.verifies()
|
||||
}
|
||||
|
||||
val bc2 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
|
||||
input("elbonian money 2")
|
||||
output(Cash.PROGRAM_ID, "bob cash 2", notary = notary) { 300.DOLLARS.CASH issuedBy issuer ownedBy owner }
|
||||
output(Cash.PROGRAM_ID, notary = notary) { 700.DOLLARS.CASH issuedBy issuer ownedBy interimOwner } // Change output.
|
||||
command(interimOwner.owningKey) { Cash.Commands.Move() }
|
||||
output(Cash.PROGRAM_ID, "bob cash 2", notary = notary, contractState = 300.DOLLARS.CASH issuedBy issuer ownedBy owner)
|
||||
output(Cash.PROGRAM_ID, notary = notary, contractState = 700.DOLLARS.CASH issuedBy issuer ownedBy interimOwner) // Change output.
|
||||
command(interimOwner.owningKey, Cash.Commands.Move())
|
||||
this.verifies()
|
||||
}
|
||||
|
||||
@ -697,10 +697,9 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
attachmentID: SecureHash?,
|
||||
notary: Party): Pair<Vault<ContractState>, List<WireTransaction>> {
|
||||
val ap = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
|
||||
output(CommercialPaper.CP_PROGRAM_ID, "alice's paper", notary = notary) {
|
||||
CommercialPaper.State(issuer, owner, amount, TEST_TX_TIME + 7.days)
|
||||
}
|
||||
command(issuer.party.owningKey) { CommercialPaper.Commands.Issue() }
|
||||
output(CommercialPaper.CP_PROGRAM_ID, "alice's paper", notary = notary,
|
||||
contractState = CommercialPaper.State(issuer, owner, amount, TEST_TX_TIME + 7.days))
|
||||
command(issuer.party.owningKey, CommercialPaper.Commands.Issue())
|
||||
if (!withError)
|
||||
timeWindow(time = TEST_TX_TIME)
|
||||
if (attachmentID != null)
|
||||
|
@ -1,7 +1,7 @@
|
||||
package net.corda.node.services
|
||||
|
||||
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Test
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
package net.corda.node.services.config
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.nodeapi.config.toProperties
|
||||
import net.corda.nodeapi.internal.config.toProperties
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
|
@ -12,9 +12,7 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.node.internal.FlowStarterImpl
|
||||
@ -44,7 +42,6 @@ import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.nio.file.Paths
|
||||
import java.security.PublicKey
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.CountDownLatch
|
||||
@ -105,13 +102,9 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
doReturn(MonitoringService(MetricRegistry())).whenever(it).monitoringService
|
||||
doReturn(validatedTransactions).whenever(it).validatedTransactions
|
||||
doReturn(NetworkMapCacheImpl(MockNetworkMapCache(database), identityService)).whenever(it).networkMapCache
|
||||
doCallRealMethod().whenever(it).signInitialTransaction(any(), any<PublicKey>())
|
||||
doReturn(myInfo).whenever(it).myInfo
|
||||
doReturn(kms).whenever(it).keyManagementService
|
||||
doReturn(CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.testing.contracts")), MockAttachmentStorage())).whenever(it).cordappProvider
|
||||
doCallRealMethod().whenever(it).recordTransactions(any<StatesToRecord>(), any())
|
||||
doCallRealMethod().whenever(it).recordTransactions(any<Iterable<SignedTransaction>>())
|
||||
doCallRealMethod().whenever(it).recordTransactions(any<SignedTransaction>(), anyVararg())
|
||||
doReturn(NodeVaultService(testClock, kms, stateLoader, database.hibernateConfig)).whenever(it).vaultService
|
||||
doReturn(this@NodeSchedulerServiceTest).whenever(it).testReference
|
||||
|
||||
|
@ -52,7 +52,7 @@ class NetworkMapUpdaterTest {
|
||||
val networkMapClient = mock<NetworkMapClient>()
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
|
||||
|
||||
// Publish node info for the first time.
|
||||
@ -101,7 +101,7 @@ class NetworkMapUpdaterTest {
|
||||
}
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
|
||||
|
||||
// Test adding new node.
|
||||
@ -155,7 +155,7 @@ class NetworkMapUpdaterTest {
|
||||
}
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
|
||||
|
||||
// Add all nodes.
|
||||
@ -199,7 +199,7 @@ class NetworkMapUpdaterTest {
|
||||
val networkMapCache = getMockNetworkMapCache()
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, null)
|
||||
|
||||
// Not subscribed yet.
|
||||
|
Reference in New Issue
Block a user