mirror of
https://github.com/corda/corda.git
synced 2024-12-18 12:46:29 +00:00
[CORDA:936]: Enable RPC layer to work with SSL
This commit is contained in:
parent
70f1fdeb2b
commit
142f52fa82
15
.idea/runConfigurations/Explorer___demo_nodes.xml
generated
15
.idea/runConfigurations/Explorer___demo_nodes.xml
generated
@ -1,15 +0,0 @@
|
||||
<component name="ProjectRunConfigurationManager">
|
||||
<configuration default="false" name="Explorer - demo nodes" type="JetRunConfigurationType" factoryName="Kotlin" singleton="true">
|
||||
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
|
||||
<option name="MAIN_CLASS_NAME" value="net.corda.explorer.MainKt" />
|
||||
<option name="VM_PARAMETERS" value="" />
|
||||
<option name="PROGRAM_PARAMETERS" value="" />
|
||||
<option name="WORKING_DIRECTORY" value="" />
|
||||
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
|
||||
<option name="ALTERNATIVE_JRE_PATH" value="1.8" />
|
||||
<option name="PASS_PARENT_ENVS" value="true" />
|
||||
<module name="explorer_main" />
|
||||
<envs />
|
||||
<method />
|
||||
</configuration>
|
||||
</component>
|
@ -1,15 +0,0 @@
|
||||
<component name="ProjectRunConfigurationManager">
|
||||
<configuration default="false" name="Explorer - demo nodes (simulation)" type="JetRunConfigurationType" factoryName="Kotlin" singleton="true">
|
||||
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
|
||||
<option name="MAIN_CLASS_NAME" value="net.corda.explorer.MainKt" />
|
||||
<option name="VM_PARAMETERS" value="" />
|
||||
<option name="PROGRAM_PARAMETERS" value="-S" />
|
||||
<option name="WORKING_DIRECTORY" value="" />
|
||||
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
|
||||
<option name="ALTERNATIVE_JRE_PATH" value="1.8" />
|
||||
<option name="PASS_PARENT_ENVS" value="true" />
|
||||
<module name="explorer_main" />
|
||||
<envs />
|
||||
<method />
|
||||
</configuration>
|
||||
</component>
|
@ -75,7 +75,7 @@ class NodeMonitorModelTest {
|
||||
vaultUpdates = monitor.vaultUpdates.bufferUntilSubscribed()
|
||||
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
|
||||
|
||||
monitor.register(aliceNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password)
|
||||
monitor.register(aliceNodeHandle.configuration.rpcOptions.address!!, cashUser.username, cashUser.password)
|
||||
rpc = monitor.proxyObservable.value!!
|
||||
notaryParty = defaultNotaryIdentity
|
||||
|
||||
@ -83,7 +83,7 @@ class NodeMonitorModelTest {
|
||||
bobNode = bobNodeHandle.nodeInfo
|
||||
val monitorBob = NodeMonitorModel()
|
||||
stateMachineUpdatesBob = monitorBob.stateMachineUpdates.bufferUntilSubscribed()
|
||||
monitorBob.register(bobNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password)
|
||||
monitorBob.register(bobNodeHandle.configuration.rpcOptions.address!!, cashUser.username, cashUser.password)
|
||||
rpcBob = monitorBob.proxyObservable.value!!
|
||||
runTest()
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ public class CordaRPCJavaClientTest extends NodeBasedTest {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
node = startNode(ALICE_NAME, 1, singletonList(rpcUser));
|
||||
client = new CordaRPCClient(requireNonNull(node.getInternals().getConfiguration().getRpcAddress()));
|
||||
client = new CordaRPCClient(requireNonNull(node.getInternals().getConfiguration().getRpcOptions().getAddress()));
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -53,7 +53,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
|
||||
@Before
|
||||
fun setUp() {
|
||||
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
|
||||
client = CordaRPCClient(node.internals.configuration.rpcAddress!!)
|
||||
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!)
|
||||
identity = node.info.identityFromX500Name(ALICE_NAME)
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,7 @@ import net.corda.core.serialization.internal.effectiveSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
|
||||
import java.time.Duration
|
||||
|
||||
@ -67,10 +68,12 @@ data class CordaRPCClientConfiguration(val connectionMaxRetryInterval: Duration)
|
||||
*
|
||||
* @param hostAndPort The network address to connect to.
|
||||
* @param configuration An optional configuration used to tweak client behaviour.
|
||||
* @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server.
|
||||
*/
|
||||
class CordaRPCClient @JvmOverloads constructor(
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
sslConfiguration: SSLConfiguration? = null
|
||||
) {
|
||||
init {
|
||||
try {
|
||||
@ -85,7 +88,7 @@ class CordaRPCClient @JvmOverloads constructor(
|
||||
}
|
||||
|
||||
private val rpcClient = RPCClient<CordaRPCOps>(
|
||||
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = null),
|
||||
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration),
|
||||
configuration.toRpcClientConfiguration(),
|
||||
KRYO_RPC_CLIENT_CONTEXT
|
||||
)
|
||||
|
@ -6,6 +6,20 @@ from the previous milestone release.
|
||||
|
||||
UNRELEASED
|
||||
----------
|
||||
* Separated our pre-existing Artemis broker into an RPC broker and a P2P broker.
|
||||
|
||||
* Refactored ``NodeConfiguration`` to expose ``NodeRpcOptions`` (using top-level "rpcAddress" property still works with warning).
|
||||
|
||||
* Modified ``CordaRPCClient`` constructor to take a ``SSLConfiguration?`` additional parameter, defaulted to ``null``.
|
||||
|
||||
* Introduced ``CertificateChainCheckPolicy.UsernameMustMatchCommonName`` sub-type, allowing customers to optionally enforce username == CN condition on RPC SSL certificates.
|
||||
|
||||
* Modified ``DriverDSL`` and sub-types to allow specifying RPC settings for the Node.
|
||||
|
||||
* Modified the ``DriverDSL`` to start Cordformation nodes allowing automatic generation of "rpcSettings.adminAddress" in case "rcpSettings.useSsl" is ``false`` (the default).
|
||||
|
||||
* Introduced ``UnsafeCertificatesFactory`` allowing programmatic generation of X509 certificates for test purposes.
|
||||
|
||||
* JPA Mapping annotations for States extending ``CommonSchemaV1.LinearState`` and ``CommonSchemaV1.FungibleState`` on the
|
||||
`participants` collection need to be moved to the actual class. This allows to properly specify the unique table name per a collection.
|
||||
See: DummyDealStateSchemaV1.PersistentDummyDealState
|
||||
|
@ -245,6 +245,11 @@ Wire protocol
|
||||
-------------
|
||||
The client RPC wire protocol is defined and documented in ``net/corda/client/rpc/RPCApi.kt``.
|
||||
|
||||
Wire security
|
||||
-------------
|
||||
``CordaRPCClient`` has an optional constructor parameter of type ``SSLConfiguration``, defaulted to ``null``, which allows
|
||||
communication with the node using SSL. Default ``null`` value means no SSL used in the context of RPC.
|
||||
|
||||
Whitelisting classes with the Corda node
|
||||
----------------------------------------
|
||||
CorDapps must whitelist any classes used over RPC with Corda's serialization framework, unless they are whitelisted by
|
||||
|
@ -38,7 +38,12 @@ Simple Notary configuration file.
|
||||
keyStorePassword : "cordacadevpass"
|
||||
trustStorePassword : "trustpass"
|
||||
p2pAddress : "localhost:12345"
|
||||
rpcAddress : "localhost:12346"
|
||||
rpcSettings = {
|
||||
useSsl = false
|
||||
standAloneBroker = false
|
||||
address : "my-corda-node:10003"
|
||||
adminAddress : "my-corda-node:10004"
|
||||
}
|
||||
webAddress : "localhost:12347"
|
||||
notary : {
|
||||
validating : false
|
||||
@ -87,7 +92,21 @@ path to the node's base directory.
|
||||
here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable,
|
||||
the node will try to auto-discover its public one.
|
||||
|
||||
:rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC.
|
||||
:rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block.
|
||||
|
||||
:rpcSettings: Options for the RPC server.
|
||||
|
||||
:useSsl: (optional) boolean, indicates whether the node should require clients to use SSL for RPC connections, defaulted to ``false``.
|
||||
:standAloneBroker: (optional) boolean, indicates whether the node will connect to a standalone broker for RPC, defaulted to ``false``.
|
||||
:address: (optional) host and port for the RPC server binding, if any.
|
||||
:adminAddress: (optional) host and port for the RPC admin binding (only required when ``useSsl`` is ``false``, because the node connects to Artemis using SSL to ensure admin privileges are not accessible outside the node).
|
||||
:ssl: (optional) SSL settings for the RPC server.
|
||||
|
||||
:keyStorePassword: password for the key store.
|
||||
:trustStorePassword: password for the trust store.
|
||||
:certificatesDirectory: directory in which the stores will be searched, unless absolute paths are provided.
|
||||
:sslKeystore: absolute path to the ssl key store, defaulted to ``certificatesDirectory / "sslkeystore.jks"``.
|
||||
:trustStoreFile: absolute path to the trust store, defaulted to ``certificatesDirectory / "truststore.jks"``.
|
||||
|
||||
:security: Contains various nested fields controlling user authentication/authorization, in particular for RPC accesses. See
|
||||
:doc:`clientrpc` for details.
|
||||
|
@ -8,7 +8,12 @@ dataSourceProperties : {
|
||||
"dataSource.password" : ""
|
||||
}
|
||||
p2pAddress : "my-corda-node:10002"
|
||||
rpcAddress : "my-corda-node:10003"
|
||||
rpcSettings = {
|
||||
useSsl = false
|
||||
standAloneBroker = false
|
||||
address : "my-corda-node:10003"
|
||||
adminAddress : "my-corda-node:10004"
|
||||
}
|
||||
webAddress : "localhost:10004"
|
||||
rpcUsers : [
|
||||
{ username=user1, password=letmein, permissions=[ StartFlow.net.corda.protocols.CashProtocol ] }
|
||||
|
@ -23,15 +23,22 @@ fun SSLConfiguration.createDevKeyStores(legalName: CordaX500Name,
|
||||
intermediateCa: CertificateAndKeyPair = DEV_INTERMEDIATE_CA) {
|
||||
val (nodeCaCert, nodeCaKeyPair) = createDevNodeCa(intermediateCa, legalName)
|
||||
|
||||
loadOrCreateKeyStore(nodeKeystore, keyStorePassword).apply {
|
||||
addOrReplaceKey(
|
||||
X509Utilities.CORDA_CLIENT_CA,
|
||||
nodeCaKeyPair.private,
|
||||
keyStorePassword.toCharArray(),
|
||||
arrayOf(nodeCaCert, intermediateCa.certificate, rootCert))
|
||||
save(nodeKeystore, keyStorePassword)
|
||||
}
|
||||
createDevKeyStores(rootCert, intermediateCa, nodeCaCert, nodeCaKeyPair, legalName)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the node and SSL key stores needed by a node. The node key store will be populated with a node CA cert (using
|
||||
* the given legal name), and the SSL key store will store the TLS cert which is a sub-cert of the node CA.
|
||||
*/
|
||||
fun SSLConfiguration.createDevKeyStores(rootCert: X509Certificate, intermediateCa: CertificateAndKeyPair, nodeCaCert: X509Certificate, nodeCaKeyPair: KeyPair, legalName: CordaX500Name) {
|
||||
createNodeKeyStore(nodeCaCert, nodeCaKeyPair, intermediateCa, rootCert)
|
||||
createSslKeyStore(nodeCaCert, nodeCaKeyPair, legalName, intermediateCa, rootCert)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the SSL key store needed by a node.
|
||||
*/
|
||||
fun SSLConfiguration.createSslKeyStore(nodeCaCert: X509Certificate, nodeCaKeyPair: KeyPair, legalName: CordaX500Name, intermediateCa: CertificateAndKeyPair, rootCert: X509Certificate) {
|
||||
val tlsKeyPair = generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
val tlsCert = X509Utilities.createCertificate(CertificateType.TLS, nodeCaCert, nodeCaKeyPair, legalName.x500Principal, tlsKeyPair.public)
|
||||
|
||||
@ -45,6 +52,20 @@ fun SSLConfiguration.createDevKeyStores(legalName: CordaX500Name,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the node key store needed by a node.
|
||||
*/
|
||||
fun SSLConfiguration.createNodeKeyStore(nodeCaCert: X509Certificate, nodeCaKeyPair: KeyPair, intermediateCa: CertificateAndKeyPair, rootCert: X509Certificate) {
|
||||
loadOrCreateKeyStore(nodeKeystore, keyStorePassword).apply {
|
||||
addOrReplaceKey(
|
||||
X509Utilities.CORDA_CLIENT_CA,
|
||||
nodeCaKeyPair.private,
|
||||
keyStorePassword.toCharArray(),
|
||||
arrayOf(nodeCaCert, intermediateCa.certificate, rootCert))
|
||||
save(nodeKeystore, keyStorePassword)
|
||||
}
|
||||
}
|
||||
|
||||
fun createDevNetworkMapCa(rootCa: CertificateAndKeyPair = DEV_ROOT_CA): CertificateAndKeyPair {
|
||||
val keyPair = generateKeyPair()
|
||||
val cert = X509Utilities.createCertificate(
|
||||
|
@ -38,7 +38,7 @@ operator fun <T : Any> Config.getValue(receiver: Any, metadata: KProperty<*>): T
|
||||
}
|
||||
|
||||
fun <T : Any> Config.parseAs(clazz: KClass<T>): T {
|
||||
require(clazz.isData) { "Only Kotlin data classes can be parsed" }
|
||||
require(clazz.isData) { "Only Kotlin data classes can be parsed. Offending: ${clazz.qualifiedName}" }
|
||||
val constructor = clazz.primaryConstructor!!
|
||||
val args = constructor.parameters
|
||||
.filterNot { it.isOptional && !hasPath(it.name!!) }
|
||||
|
@ -3,6 +3,7 @@
|
||||
package net.corda.nodeapi.internal.crypto
|
||||
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.exists
|
||||
import net.corda.core.internal.read
|
||||
import net.corda.core.internal.write
|
||||
@ -30,6 +31,7 @@ fun loadOrCreateKeyStore(keyStoreFilePath: Path, storePassword: String): KeyStor
|
||||
keyStoreFilePath.read { keyStore.load(it, pass) }
|
||||
} else {
|
||||
keyStore.load(null, pass)
|
||||
keyStoreFilePath.parent.createDirectories()
|
||||
keyStoreFilePath.write { keyStore.store(it, pass) }
|
||||
}
|
||||
return keyStore
|
||||
|
@ -94,7 +94,7 @@ class AuthDBTests : NodeBasedTest() {
|
||||
)
|
||||
|
||||
node = startNode(ALICE_NAME, rpcUsers = emptyList(), configOverrides = securityConfig)
|
||||
client = CordaRPCClient(node.internals.configuration.rpcAddress!!)
|
||||
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -161,7 +161,6 @@ class AMQPBridgeTest {
|
||||
artemisServer.stop()
|
||||
artemisLegacyClient.stop()
|
||||
artemisLegacyServer.stop()
|
||||
|
||||
}
|
||||
|
||||
private fun createArtemis(sourceQueueName: String?): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
@ -170,6 +169,7 @@ class AMQPBridgeTest {
|
||||
doReturn(ALICE_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(artemisAddress).whenever(it).p2pAddress
|
||||
doReturn("").whenever(it).exportJMXto
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(true).whenever(it).useAMQPBridges
|
||||
@ -180,7 +180,7 @@ class AMQPBridgeTest {
|
||||
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
|
||||
}
|
||||
val userService = rigorousMock<RPCSecurityManager>()
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE)
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, userService, MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
@ -198,6 +198,7 @@ class AMQPBridgeTest {
|
||||
doReturn(BOB_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(artemisAddress).whenever(it).p2pAddress
|
||||
doReturn("").whenever(it).exportJMXto
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(false).whenever(it).useAMQPBridges
|
||||
@ -209,7 +210,7 @@ class AMQPBridgeTest {
|
||||
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
|
||||
}
|
||||
val userService = rigorousMock<RPCSecurityManager>()
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, null, networkMap, userService, MAX_MESSAGE_SIZE)
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, networkMap, userService, MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress2, MAX_MESSAGE_SIZE)
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
|
@ -226,6 +226,7 @@ class ProtonWrapperTests {
|
||||
doReturn(CHARLIE_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
|
||||
doReturn("").whenever(it).exportJMXto
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(true).whenever(it).useAMQPBridges
|
||||
@ -236,7 +237,7 @@ class ProtonWrapperTests {
|
||||
doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed
|
||||
}
|
||||
val userService = rigorousMock<RPCSecurityManager>()
|
||||
val server = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE)
|
||||
val server = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, userService, MAX_MESSAGE_SIZE)
|
||||
val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
|
||||
server.start()
|
||||
client.start()
|
||||
|
@ -35,7 +35,7 @@ class NetworkMapTest {
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
|
||||
private val cacheTimeout = 1.seconds
|
||||
private val portAllocation = PortAllocation.Incremental(10000)
|
||||
private val portAllocation = PortAllocation.RandomFree
|
||||
|
||||
private lateinit var networkMapServer: NetworkMapServer
|
||||
private lateinit var compatibilityZone: CompatibilityZoneParams
|
||||
|
@ -0,0 +1,64 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.Permissions.Companion.all
|
||||
import net.corda.node.testsupport.withCertificates
|
||||
import net.corda.node.testsupport.withKeyStores
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.internal.useSslRpcOverrides
|
||||
import net.corda.testing.node.User
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
|
||||
class RpcSslTest {
|
||||
@Test
|
||||
fun rpc_client_using_ssl() {
|
||||
val user = User("mark", "dadada", setOf(all()))
|
||||
withCertificates { server, client, createSelfSigned, createSignedBy ->
|
||||
val rootCertificate = createSelfSigned(CordaX500Name("SystemUsers/Node", "IT", "R3 London", "London", "London", "GB"))
|
||||
val markCertificate = createSignedBy(CordaX500Name("mark", "IT", "R3 London", "London", "London", "GB"), rootCertificate)
|
||||
|
||||
// truststore needs to contain root CA for how the driver works...
|
||||
server.keyStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["mark"] = markCertificate
|
||||
|
||||
client.keyStore["mark"] = markCertificate
|
||||
client.trustStore["cordaclienttls"] = rootCertificate
|
||||
|
||||
withKeyStores(server, client) { nodeSslOptions, clientSslOptions ->
|
||||
var successful = false
|
||||
driver(isDebug = true, startNodesInProcess = true, portAllocation = PortAllocation.RandomFree) {
|
||||
startNode(rpcUsers = listOf(user), customOverrides = nodeSslOptions.useSslRpcOverrides()).getOrThrow().use { node ->
|
||||
node.rpcClientToNode(clientSslOptions).start(user.username, user.password).use { connection ->
|
||||
connection.proxy.apply {
|
||||
nodeInfo()
|
||||
successful = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
assertThat(successful).isTrue()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun rpc_client_not_using_ssl() {
|
||||
val user = User("mark", "dadada", setOf(all()))
|
||||
var successful = false
|
||||
driver(isDebug = true, startNodesInProcess = true, portAllocation = PortAllocation.RandomFree) {
|
||||
startNode(rpcUsers = listOf(user)).getOrThrow().use { node ->
|
||||
node.rpcClientToNode().start(user.username, user.password).use { connection ->
|
||||
connection.proxy.apply {
|
||||
nodeInfo()
|
||||
successful = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
assertThat(successful).isTrue()
|
||||
}
|
||||
}
|
@ -11,6 +11,7 @@ import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.contracts.DummyState
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import org.junit.Test
|
||||
@ -69,7 +70,7 @@ class LargeTransactionsTest {
|
||||
val bigFile2 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 1)
|
||||
val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 2)
|
||||
val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024 * 1024 * 3, 3)
|
||||
driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.testing.contracts")) {
|
||||
driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.testing.contracts"), portAllocation = PortAllocation.RandomFree) {
|
||||
val rpcUser = User("admin", "admin", setOf("ALL"))
|
||||
val (alice, _) = listOf(ALICE_NAME, BOB_NAME).map { startNode(providedName = it, rpcUsers = listOf(rpcUser)) }.transpose().getOrThrow()
|
||||
alice.rpcClientToNode().use(rpcUser.username, rpcUser.password) {
|
||||
|
@ -27,7 +27,7 @@ import java.nio.file.Files
|
||||
/**
|
||||
* Runs the security tests with the attacker pretending to be a node on the network.
|
||||
*/
|
||||
class MQSecurityAsNodeTest : MQSecurityTest() {
|
||||
class MQSecurityAsNodeTest : P2PMQSecurityTest() {
|
||||
override fun createAttacker(): SimpleMQClient {
|
||||
return clientTo(alice.internals.configuration.p2pAddress)
|
||||
}
|
||||
@ -67,7 +67,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
|
||||
|
||||
@Test
|
||||
fun `login to a non ssl port as a node user`() {
|
||||
val attacker = clientTo(alice.internals.configuration.rpcAddress!!, sslConfiguration = null)
|
||||
val attacker = clientTo(alice.internals.configuration.rpcOptions.address!!, sslConfiguration = null)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.start(NODE_USER, NODE_USER, enableSSL = false)
|
||||
}
|
||||
@ -75,7 +75,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
|
||||
|
||||
@Test
|
||||
fun `login to a non ssl port as a peer user`() {
|
||||
val attacker = clientTo(alice.internals.configuration.rpcAddress!!, sslConfiguration = null)
|
||||
val attacker = clientTo(alice.internals.configuration.rpcOptions.address!!, sslConfiguration = null)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.start(PEER_USER, PEER_USER, enableSSL = false) // Login as a peer
|
||||
}
|
||||
|
@ -6,9 +6,9 @@ import org.junit.Test
|
||||
/**
|
||||
* Runs the security tests with the attacker being a valid RPC user of Alice.
|
||||
*/
|
||||
class MQSecurityAsRPCTest : MQSecurityTest() {
|
||||
class MQSecurityAsRPCTest : RPCMQSecurityTest() {
|
||||
override fun createAttacker(): SimpleMQClient {
|
||||
return clientTo(alice.internals.configuration.rpcAddress!!)
|
||||
return clientTo(alice.internals.configuration.rpcOptions.address!!)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -21,8 +21,6 @@ import net.corda.node.internal.StartedNode
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
@ -69,46 +67,6 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
clients.forEach { it.stop() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from P2P queue`() {
|
||||
assertConsumeAttackFails("$P2P_PREFIX${alice.info.chooseIdentity().owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from peer queue`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send message to address of peer which has been communicated with`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for peer which has not been communicated with`() {
|
||||
val bob = startNode(BOB_NAME)
|
||||
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for unknown peer`() {
|
||||
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toStringShort()}"
|
||||
assertAllQueueCreationAttacksFail(invalidPeerQueue)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from RPC requests queue`() {
|
||||
assertConsumeAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from logged in user's RPC queue`() {
|
||||
val user1Queue = loginToRPCAndGetClientQueue()
|
||||
assertConsumeAttackFails(user1Queue)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for valid RPC user`() {
|
||||
val user1Queue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.${random63BitValue()}"
|
||||
@ -155,9 +113,9 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
fun loginToRPCAndGetClientQueue(): String {
|
||||
loginToRPC(alice.internals.configuration.rpcAddress!!, rpcUser)
|
||||
loginToRPC(alice.internals.configuration.rpcOptions.address!!, rpcUser)
|
||||
val clientQueueQuery = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*")
|
||||
val client = clientTo(alice.internals.configuration.rpcAddress!!)
|
||||
val client = clientTo(alice.internals.configuration.rpcOptions.address!!)
|
||||
client.start(rpcUser.username, rpcUser.password, false)
|
||||
return client.session.addressQuery(clientQueueQuery).queueNames.single().toString()
|
||||
}
|
||||
@ -178,6 +136,12 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
}
|
||||
}
|
||||
|
||||
fun assertAttackFailsNonexistent(queue: String, attack: () -> Unit) {
|
||||
assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java)
|
||||
.isThrownBy(attack)
|
||||
.withMessageContaining(queue)
|
||||
}
|
||||
|
||||
fun assertNonTempQueueCreationAttackFails(queue: String, durable: Boolean) {
|
||||
val permission = if (durable) "CREATE_DURABLE_QUEUE" else "CREATE_NON_DURABLE_QUEUE"
|
||||
assertAttackFails(queue, permission) {
|
||||
@ -208,6 +172,15 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
}
|
||||
}
|
||||
|
||||
fun assertConsumeAttackFailsNonexistent(queue: String) {
|
||||
assertAttackFailsNonexistent(queue) {
|
||||
attacker.session.createConsumer(queue)
|
||||
}
|
||||
assertAttackFailsNonexistent(queue) {
|
||||
attacker.session.createConsumer(queue, true)
|
||||
}
|
||||
}
|
||||
|
||||
fun assertAttackFails(queue: String, permission: String, attack: () -> Unit) {
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java)
|
||||
.isThrownBy(attack)
|
||||
@ -215,7 +188,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
.withMessageContaining(permission)
|
||||
}
|
||||
|
||||
private fun startBobAndCommunicateWithAlice(): Party {
|
||||
protected fun startBobAndCommunicateWithAlice(): Party {
|
||||
val bob = startNode(BOB_NAME)
|
||||
bob.registerInitiatedFlow(ReceiveFlow::class.java)
|
||||
val bobParty = bob.info.chooseIdentity()
|
||||
|
@ -0,0 +1,56 @@
|
||||
package net.corda.services.messaging
|
||||
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.chooseIdentity
|
||||
import org.junit.Test
|
||||
|
||||
/**
|
||||
* Runs a series of MQ-related attacks against a node. Subclasses need to call [startAttacker] to connect
|
||||
* the attacker to [alice].
|
||||
*/
|
||||
abstract class P2PMQSecurityTest : MQSecurityTest() {
|
||||
@Test
|
||||
fun `consume message from P2P queue`() {
|
||||
assertConsumeAttackFails("$P2P_PREFIX${alice.info.chooseIdentity().owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from peer queue`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send message to address of peer which has been communicated with`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for peer which has not been communicated with`() {
|
||||
val bob = startNode(BOB_NAME)
|
||||
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for unknown peer`() {
|
||||
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toStringShort()}"
|
||||
assertAllQueueCreationAttacksFail(invalidPeerQueue)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from RPC requests queue`() {
|
||||
assertConsumeAttackFailsNonexistent(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from logged in user's RPC queue`() {
|
||||
val user1Queue = loginToRPCAndGetClientQueue()
|
||||
assertConsumeAttackFailsNonexistent(user1Queue)
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
package net.corda.services.messaging
|
||||
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.utilities.toBase58String
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.chooseIdentity
|
||||
import org.junit.Test
|
||||
|
||||
/**
|
||||
* Runs a series of MQ-related attacks against a node. Subclasses need to call [startAttacker] to connect
|
||||
* the attacker to [alice].
|
||||
*/
|
||||
abstract class RPCMQSecurityTest : MQSecurityTest() {
|
||||
@Test
|
||||
fun `consume message from P2P queue`() {
|
||||
assertConsumeAttackFailsNonexistent("$P2P_PREFIX${alice.info.chooseIdentity().owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from peer queue`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
assertConsumeAttackFailsNonexistent("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send message to address of peer which has been communicated with`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
assertConsumeAttackFailsNonexistent("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for peer which has not been communicated with`() {
|
||||
val bob = startNode(BOB_NAME)
|
||||
assertConsumeAttackFailsNonexistent("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toBase58String()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for unknown peer`() {
|
||||
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toBase58String()}"
|
||||
assertConsumeAttackFailsNonexistent(invalidPeerQueue)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from RPC requests queue`() {
|
||||
assertConsumeAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from logged in user's RPC queue`() {
|
||||
val user1Queue = loginToRPCAndGetClientQueue()
|
||||
assertConsumeAttackFails(user1Queue)
|
||||
}
|
||||
}
|
@ -21,6 +21,7 @@ import net.corda.node.services.Permissions.Companion.invokeRpc
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.core.chooseIdentity
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.driver.driver
|
||||
import org.junit.Assume.assumeFalse
|
||||
import org.junit.Test
|
||||
@ -40,7 +41,7 @@ class NodeStatePersistenceTests {
|
||||
|
||||
val user = User("mark", "dadada", setOf(startFlow<SendMessageFlow>(), invokeRpc("vaultQuery")))
|
||||
val message = Message("Hello world!")
|
||||
val stateAndRef: StateAndRef<MessageState>? = driver(isDebug = true, startNodesInProcess = isQuasarAgentSpecified()) {
|
||||
val stateAndRef: StateAndRef<MessageState>? = driver(isDebug = true, startNodesInProcess = isQuasarAgentSpecified(), portAllocation = PortAllocation.RandomFree) {
|
||||
val nodeName = {
|
||||
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
|
||||
val nodeName = nodeHandle.nodeInfo.chooseIdentity().name
|
||||
@ -74,7 +75,7 @@ class NodeStatePersistenceTests {
|
||||
|
||||
val user = User("mark", "dadada", setOf(startFlow<SendMessageFlow>(), invokeRpc("vaultQuery")))
|
||||
val message = Message("Hello world!")
|
||||
val stateAndRef: StateAndRef<MessageState>? = driver(isDebug = true, startNodesInProcess = isQuasarAgentSpecified()) {
|
||||
val stateAndRef: StateAndRef<MessageState>? = driver(isDebug = true, startNodesInProcess = isQuasarAgentSpecified(), portAllocation = PortAllocation.RandomFree) {
|
||||
val nodeName = {
|
||||
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
|
||||
val nodeName = nodeHandle.nodeInfo.chooseIdentity().name
|
||||
|
@ -0,0 +1,13 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
interface LifecycleSupport : Startable, Stoppable
|
||||
|
||||
interface Stoppable {
|
||||
fun stop()
|
||||
}
|
||||
|
||||
interface Startable {
|
||||
fun start()
|
||||
|
||||
val started: Boolean
|
||||
}
|
@ -4,6 +4,7 @@ import com.codahale.metrics.JmxReporter
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.node.NodeInfo
|
||||
@ -15,6 +16,8 @@ import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.artemis.ArtemisBroker
|
||||
import net.corda.node.internal.artemis.BrokerAddresses
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
@ -23,6 +26,7 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.SecurityConfiguration
|
||||
import net.corda.node.services.config.VerifierType
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
@ -36,6 +40,7 @@ import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Scheduler
|
||||
import rx.schedulers.Schedulers
|
||||
import java.nio.file.Path
|
||||
import java.security.PublicKey
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
@ -132,6 +137,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
override lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
|
||||
|
||||
private var messageBroker: ArtemisMessagingServer? = null
|
||||
private var rpcBroker: ArtemisBroker? = null
|
||||
|
||||
private var shutdownHook: ShutdownHook? = null
|
||||
|
||||
@ -139,15 +145,16 @@ open class Node(configuration: NodeConfiguration,
|
||||
// Construct security manager reading users data either from the 'security' config section
|
||||
// if present or from rpcUsers list if the former is missing from config.
|
||||
val securityManagerConfig = configuration.security?.authService ?:
|
||||
SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
|
||||
SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
|
||||
|
||||
securityManager = RPCSecurityManagerImpl(securityManagerConfig)
|
||||
|
||||
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
|
||||
val advertisedAddress = info.addresses.single()
|
||||
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
|
||||
val advertisedAddress = info.addresses.first()
|
||||
|
||||
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
|
||||
rpcMessagingClient = RPCMessagingClient(configuration, serverAddress, networkParameters.maxMessageSize)
|
||||
rpcMessagingClient = RPCMessagingClient(configuration.rpcOptions.sslConfig, rpcServerAddresses.admin, networkParameters.maxMessageSize)
|
||||
verifierMessagingClient = when (configuration.verifierType) {
|
||||
VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, networkParameters.maxMessageSize)
|
||||
VerifierType.InMemory -> null
|
||||
@ -166,15 +173,38 @@ open class Node(configuration: NodeConfiguration,
|
||||
networkParameters.maxMessageSize)
|
||||
}
|
||||
|
||||
private fun startLocalRpcBroker(): BrokerAddresses {
|
||||
with(configuration) {
|
||||
require(rpcOptions.address != null) { "RPC address needs to be specified for local RPC broker." }
|
||||
val rpcBrokerDirectory: Path = baseDirectory / "brokers" / "rpc"
|
||||
with(rpcOptions) {
|
||||
rpcBroker = if (useSsl) {
|
||||
ArtemisRpcBroker.withSsl(this.address!!, sslConfig, securityManager, certificateChainCheckPolicies, networkParameters.maxMessageSize, exportJMXto.isNotEmpty(), rpcBrokerDirectory)
|
||||
} else {
|
||||
ArtemisRpcBroker.withoutSsl(this.address!!, adminAddress!!, sslConfig, securityManager, certificateChainCheckPolicies, networkParameters.maxMessageSize, exportJMXto.isNotEmpty(), rpcBrokerDirectory)
|
||||
}
|
||||
}
|
||||
return rpcBroker!!.addresses
|
||||
}
|
||||
}
|
||||
|
||||
private fun makeLocalMessageBroker(): NetworkHostAndPort {
|
||||
with(configuration) {
|
||||
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, rpcAddress?.port, services.networkMapCache, securityManager, networkParameters.maxMessageSize)
|
||||
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, services.networkMapCache, securityManager, networkParameters.maxMessageSize)
|
||||
return NetworkHostAndPort("localhost", p2pAddress.port)
|
||||
}
|
||||
}
|
||||
|
||||
override fun myAddresses(): List<NetworkHostAndPort> {
|
||||
return listOf(configuration.messagingServerAddress ?: getAdvertisedAddress())
|
||||
val addresses = mutableListOf<NetworkHostAndPort>()
|
||||
addresses.add(configuration.messagingServerAddress ?: getAdvertisedAddress())
|
||||
rpcBroker?.addresses?.let {
|
||||
addresses.add(it.primary)
|
||||
if (it.admin != it.primary) {
|
||||
addresses.add(it.admin)
|
||||
}
|
||||
}
|
||||
return addresses
|
||||
}
|
||||
|
||||
private fun getAdvertisedAddress(): NetworkHostAndPort {
|
||||
@ -220,12 +250,16 @@ open class Node(configuration: NodeConfiguration,
|
||||
override fun startMessagingService(rpcOps: RPCOps) {
|
||||
// Start up the embedded MQ server
|
||||
messageBroker?.apply {
|
||||
runOnStop += this::stop
|
||||
runOnStop += this::close
|
||||
start()
|
||||
}
|
||||
rpcBroker?.apply {
|
||||
runOnStop += this::close
|
||||
start()
|
||||
}
|
||||
// Start up the MQ clients.
|
||||
rpcMessagingClient.run {
|
||||
runOnStop += this::stop
|
||||
runOnStop += this::close
|
||||
start(rpcOps, securityManager)
|
||||
}
|
||||
verifierMessagingClient?.run {
|
||||
@ -331,7 +365,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
private var verifierMessagingClient: VerifierMessagingClient? = null
|
||||
/** Starts a blocking event loop for message dispatch. */
|
||||
fun run() {
|
||||
rpcMessagingClient.start2(messageBroker!!.serverControl)
|
||||
rpcMessagingClient.start2(rpcBroker!!.serverControl)
|
||||
verifierMessagingClient?.start2()
|
||||
(network as P2PMessagingClient).run()
|
||||
}
|
||||
|
@ -0,0 +1,17 @@
|
||||
package net.corda.node.internal.artemis
|
||||
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.internal.LifecycleSupport
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
|
||||
interface ArtemisBroker : LifecycleSupport, AutoCloseable {
|
||||
val addresses: BrokerAddresses
|
||||
|
||||
val serverControl: ActiveMQServerControl
|
||||
|
||||
override fun close() = stop()
|
||||
}
|
||||
|
||||
data class BrokerAddresses(val primary: NetworkHostAndPort, private val adminArg: NetworkHostAndPort?) {
|
||||
val admin = adminArg ?: primary
|
||||
}
|
@ -0,0 +1,82 @@
|
||||
package net.corda.node.internal.artemis
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import java.security.KeyStore
|
||||
import javax.security.cert.CertificateException
|
||||
import javax.security.cert.X509Certificate
|
||||
|
||||
sealed class CertificateChainCheckPolicy {
|
||||
@FunctionalInterface
|
||||
interface Check {
|
||||
fun checkCertificateChain(theirChain: Array<X509Certificate>)
|
||||
}
|
||||
|
||||
abstract fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check
|
||||
|
||||
object Any : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
|
||||
// nothing to do here
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object RootMustMatch : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
val rootPublicKey = trustStore.getCertificate(X509Utilities.CORDA_ROOT_CA).publicKey
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
|
||||
val theirRoot = theirChain.last().publicKey
|
||||
if (rootPublicKey != theirRoot) {
|
||||
throw CertificateException("Root certificate mismatch, their root = $theirRoot")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object LeafMustMatch : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
val ourPublicKey = keyStore.getCertificate(X509Utilities.CORDA_CLIENT_TLS).publicKey
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
|
||||
val theirLeaf = theirChain.first().publicKey
|
||||
if (ourPublicKey != theirLeaf) {
|
||||
throw CertificateException("Leaf certificate mismatch, their leaf = $theirLeaf")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data class MustContainOneOf(private val trustedAliases: Set<String>) : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
val trustedPublicKeys = trustedAliases.map { trustStore.getCertificate(it).publicKey }.toSet()
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
|
||||
if (!theirChain.any { it.publicKey in trustedPublicKeys }) {
|
||||
throw CertificateException("Their certificate chain contained none of the trusted ones")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object UsernameMustMatchCommonName : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
return UsernameMustMatchCommonNameCheck()
|
||||
}
|
||||
}
|
||||
|
||||
class UsernameMustMatchCommonNameCheck : Check {
|
||||
lateinit var username: String
|
||||
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
|
||||
if (!theirChain.any { certificate -> CordaX500Name.parse(certificate.subjectDN.name).commonName == username }) {
|
||||
throw CertificateException("Client certificate does not match login username.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package net.corda.node.internal.artemis
|
||||
|
||||
import net.corda.core.crypto.newSecureRandom
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
import java.math.BigInteger
|
||||
|
||||
internal open class SecureArtemisConfiguration : ConfigurationImpl() {
|
||||
init {
|
||||
// Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster
|
||||
// user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its
|
||||
// password be changed from the default (as warned in the docs). Since we don't need this feature we turn it off
|
||||
// by having its password be an unknown securely random 128-bit value.
|
||||
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
|
||||
}
|
||||
}
|
@ -3,10 +3,14 @@ package net.corda.node.services.config
|
||||
import com.typesafe.config.Config
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.messaging.CertificateChainCheckPolicy
|
||||
import net.corda.node.internal.artemis.CertificateChainCheckPolicy
|
||||
import net.corda.node.services.config.rpc.NodeRpcOptions
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.config.parseAs
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -34,7 +38,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val activeMQServer: ActiveMqServerConfiguration
|
||||
val additionalNodeInfoPollingFrequencyMsec: Long
|
||||
val p2pAddress: NetworkHostAndPort
|
||||
val rpcAddress: NetworkHostAndPort?
|
||||
val rpcOptions: NodeRpcOptions
|
||||
val messagingServerAddress: NetworkHostAndPort?
|
||||
// TODO Move into DevModeOptions
|
||||
val useTestClock: Boolean get() = false
|
||||
@ -46,7 +50,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize
|
||||
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
|
||||
|
||||
|
||||
companion object {
|
||||
// default to at least 8MB and a bit extra for larger heap sizes
|
||||
val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory()
|
||||
@ -100,7 +103,7 @@ data class BridgeConfiguration(val retryIntervalMs: Long,
|
||||
|
||||
data class ActiveMqServerConfiguration(val bridge: BridgeConfiguration)
|
||||
|
||||
fun Config.parseAsNodeConfiguration(): NodeConfiguration = this.parseAs<NodeConfigurationImpl>()
|
||||
fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs<NodeConfigurationImpl>()
|
||||
|
||||
data class NodeConfigurationImpl(
|
||||
/** This is not retrieved from the config file but rather from a command line argument. */
|
||||
@ -118,7 +121,8 @@ data class NodeConfigurationImpl(
|
||||
// Then rename this to messageRedeliveryDelay and make it of type Duration
|
||||
override val messageRedeliveryDelaySeconds: Int = 30,
|
||||
override val p2pAddress: NetworkHostAndPort,
|
||||
override val rpcAddress: NetworkHostAndPort?,
|
||||
private val rpcAddress: NetworkHostAndPort? = null,
|
||||
private val rpcSettings: NodeRpcSettings,
|
||||
// TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker.
|
||||
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
|
||||
override val messagingServerAddress: NetworkHostAndPort?,
|
||||
@ -137,7 +141,23 @@ data class NodeConfigurationImpl(
|
||||
private val transactionCacheSizeMegaBytes: Int? = null,
|
||||
private val attachmentContentCacheSizeMegaBytes: Int? = null,
|
||||
override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound
|
||||
) : NodeConfiguration {
|
||||
) : NodeConfiguration {
|
||||
companion object {
|
||||
private val logger = loggerFor<NodeConfigurationImpl>()
|
||||
}
|
||||
|
||||
override val rpcOptions: NodeRpcOptions = initialiseRpcOptions(rpcAddress, rpcSettings, SslOptions(baseDirectory / "certificates", keyStorePassword, trustStorePassword))
|
||||
|
||||
private fun initialiseRpcOptions(explicitAddress: NetworkHostAndPort?, settings: NodeRpcSettings, fallbackSslOptions: SSLConfiguration): NodeRpcOptions {
|
||||
return when {
|
||||
explicitAddress != null -> {
|
||||
require(settings.address == null) { "Can't provide top-level rpcAddress and rpcSettings.address (they control the same property)." }
|
||||
logger.warn("Top-level declaration of property 'rpcAddress' is deprecated. Please use 'rpcSettings.address' instead.")
|
||||
settings.copy(address = explicitAddress)
|
||||
}
|
||||
else -> settings
|
||||
}.asOptions(fallbackSslOptions)
|
||||
}
|
||||
|
||||
override val exportJMXto: String get() = "http"
|
||||
override val transactionCacheSizeBytes: Long
|
||||
@ -156,6 +176,28 @@ data class NodeConfigurationImpl(
|
||||
}
|
||||
}
|
||||
|
||||
data class NodeRpcSettings(
|
||||
val address: NetworkHostAndPort?,
|
||||
val adminAddress: NetworkHostAndPort?,
|
||||
val standAloneBroker: Boolean = false,
|
||||
val useSsl: Boolean = false,
|
||||
val ssl: SslOptions?
|
||||
) {
|
||||
fun asOptions(fallbackSslOptions: SSLConfiguration): NodeRpcOptions {
|
||||
return object : NodeRpcOptions {
|
||||
override val address = this@NodeRpcSettings.address
|
||||
override val adminAddress = this@NodeRpcSettings.adminAddress
|
||||
override val standAloneBroker = this@NodeRpcSettings.standAloneBroker
|
||||
override val useSsl = this@NodeRpcSettings.useSsl
|
||||
override val sslConfig = this@NodeRpcSettings.ssl ?: fallbackSslOptions
|
||||
|
||||
override fun toString(): String {
|
||||
return "address: $address, adminAddress: $adminAddress, standAloneBroker: $standAloneBroker, useSsl: $useSsl, sslConfig: $sslConfig"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum class VerifierType {
|
||||
InMemory,
|
||||
OutOfProcess
|
||||
@ -165,7 +207,8 @@ enum class CertChainPolicyType {
|
||||
Any,
|
||||
RootMustMatch,
|
||||
LeafMustMatch,
|
||||
MustContainOneOf
|
||||
MustContainOneOf,
|
||||
UsernameMustMatch
|
||||
}
|
||||
|
||||
data class CertChainPolicyConfig(val role: String, private val policy: CertChainPolicyType, private val trustedAliases: Set<String>) {
|
||||
@ -176,6 +219,7 @@ data class CertChainPolicyConfig(val role: String, private val policy: CertChain
|
||||
CertChainPolicyType.RootMustMatch -> CertificateChainCheckPolicy.RootMustMatch
|
||||
CertChainPolicyType.LeafMustMatch -> CertificateChainCheckPolicy.LeafMustMatch
|
||||
CertChainPolicyType.MustContainOneOf -> CertificateChainCheckPolicy.MustContainOneOf(trustedAliases)
|
||||
CertChainPolicyType.UsernameMustMatch -> CertificateChainCheckPolicy.UsernameMustMatchCommonName
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,13 @@
|
||||
package net.corda.node.services.config
|
||||
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
|
||||
data class SslOptions(override val certificatesDirectory: Path, override val keyStorePassword: String, override val trustStorePassword: String) : SSLConfiguration {
|
||||
constructor(certificatesDirectory: String, keyStorePassword: String, trustStorePassword: String) : this(certificatesDirectory.toAbsolutePath(), keyStorePassword, trustStorePassword)
|
||||
|
||||
fun copy(certificatesDirectory: String = this.certificatesDirectory.toString(), keyStorePassword: String = this.keyStorePassword, trustStorePassword: String = this.trustStorePassword): SslOptions = copy(certificatesDirectory = certificatesDirectory.toAbsolutePath(), keyStorePassword = keyStorePassword, trustStorePassword = trustStorePassword)
|
||||
}
|
||||
|
||||
private fun String.toAbsolutePath() = Paths.get(this).toAbsolutePath()
|
@ -0,0 +1,12 @@
|
||||
package net.corda.node.services.config.rpc
|
||||
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
|
||||
interface NodeRpcOptions {
|
||||
val address: NetworkHostAndPort?
|
||||
val adminAddress: NetworkHostAndPort?
|
||||
val standAloneBroker: Boolean
|
||||
val useSsl: Boolean
|
||||
val sslConfig: SSLConfiguration
|
||||
}
|
@ -37,7 +37,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration, private val s
|
||||
isUseGlobalPools = nodeSerializationEnv != null
|
||||
}
|
||||
val sessionFactory = locator.createSessionFactory()
|
||||
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
|
||||
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
|
||||
// using our TLS certificate.
|
||||
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
||||
// size of 1MB is acknowledged.
|
||||
|
@ -1,7 +1,6 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import net.corda.core.crypto.AddressFormatException
|
||||
import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.div
|
||||
@ -14,17 +13,18 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.security.Password
|
||||
import net.corda.node.internal.artemis.ArtemisBroker
|
||||
import net.corda.node.internal.artemis.BrokerAddresses
|
||||
import net.corda.node.internal.artemis.CertificateChainCheckPolicy
|
||||
import net.corda.node.internal.artemis.SecureArtemisConfiguration
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
@ -34,32 +34,23 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_CLIENT_TLS
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA
|
||||
import net.corda.nodeapi.internal.crypto.loadKeyStore
|
||||
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.apache.activemq.artemis.core.config.Configuration
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.security.Role
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
|
||||
import rx.Subscription
|
||||
import java.io.IOException
|
||||
import java.math.BigInteger
|
||||
import java.security.KeyStore
|
||||
import java.security.KeyStoreException
|
||||
import java.security.Principal
|
||||
import java.util.*
|
||||
@ -67,14 +58,12 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.security.auth.Subject
|
||||
import javax.security.auth.callback.CallbackHandler
|
||||
import javax.security.auth.callback.NameCallback
|
||||
import javax.security.auth.callback.PasswordCallback
|
||||
import javax.security.auth.callback.UnsupportedCallbackException
|
||||
import javax.security.auth.login.AppConfigurationEntry
|
||||
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED
|
||||
import javax.security.auth.login.FailedLoginException
|
||||
import javax.security.auth.login.LoginException
|
||||
import javax.security.auth.spi.LoginModule
|
||||
import javax.security.cert.CertificateException
|
||||
|
||||
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
|
||||
// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later)
|
||||
@ -92,10 +81,9 @@ import javax.security.cert.CertificateException
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private val p2pPort: Int,
|
||||
val rpcPort: Int?,
|
||||
val networkMapCache: NetworkMapCacheInternal,
|
||||
val securityManager: RPCSecurityManager,
|
||||
val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
@ -105,7 +93,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
private lateinit var activeMQServer: ActiveMQServer
|
||||
val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
|
||||
override val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
|
||||
private var networkChangeHandle: Subscription? = null
|
||||
private lateinit var bridgeManager: BridgeManager
|
||||
|
||||
@ -117,8 +105,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
* The server will make sure the bridge exists on network map changes, see method [updateBridgesOnNetworkChange]
|
||||
* We assume network map will be updated accordingly when the client node register with the network map.
|
||||
*/
|
||||
@Throws(IOException::class, KeyStoreException::class)
|
||||
fun start() = mutex.locked {
|
||||
override fun start() = mutex.locked {
|
||||
if (!running) {
|
||||
configureAndStartServer()
|
||||
networkChangeHandle = networkMapCache.changed.subscribe { updateBridgesOnNetworkChange(it) }
|
||||
@ -126,7 +113,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() = mutex.locked {
|
||||
override fun stop() = mutex.locked {
|
||||
bridgeManager.close()
|
||||
networkChangeHandle?.unsubscribe()
|
||||
networkChangeHandle = null
|
||||
@ -134,6 +121,11 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
running = false
|
||||
}
|
||||
|
||||
override val addresses = config.p2pAddress.let { BrokerAddresses(it, it) }
|
||||
|
||||
override val started: Boolean
|
||||
get() = activeMQServer.isStarted
|
||||
|
||||
// TODO: Maybe wrap [IOException] on a key store load error so that it's clearly splitting key store loading from
|
||||
// Artemis IO errors
|
||||
@Throws(IOException::class, KeyStoreException::class)
|
||||
@ -157,12 +149,9 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
activeMQServer.start()
|
||||
bridgeManager.start()
|
||||
Node.printBasicNodeInfo("Listening on port", p2pPort.toString())
|
||||
if (rpcPort != null) {
|
||||
Node.printBasicNodeInfo("RPC service listening on port", rpcPort.toString())
|
||||
}
|
||||
}
|
||||
|
||||
private fun createArtemisConfig() = ConfigurationImpl().apply {
|
||||
private fun createArtemisConfig() = SecureArtemisConfiguration().apply {
|
||||
val artemisDir = config.baseDirectory / "artemis"
|
||||
bindingsDirectory = (artemisDir / "bindings").toString()
|
||||
journalDirectory = (artemisDir / "journal").toString()
|
||||
@ -171,9 +160,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
acceptorFactoryClassName = NettyAcceptorFactory::class.java.name
|
||||
)
|
||||
val acceptors = mutableSetOf(createTcpTransport(connectionDirection, "0.0.0.0", p2pPort))
|
||||
if (rpcPort != null) {
|
||||
acceptors.add(createTcpTransport(connectionDirection, "0.0.0.0", rpcPort, enableSSL = false))
|
||||
}
|
||||
acceptorConfigurations = acceptors
|
||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
||||
@ -184,35 +170,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
journalBufferSize_AIO = maxMessageSize // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
|
||||
journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10MiB.
|
||||
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
|
||||
// Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster
|
||||
// user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its
|
||||
// password be changed from the default (as warned in the docs). Since we don't need this feature we turn it off
|
||||
// by having its password be an unknown securely random 128-bit value.
|
||||
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
|
||||
queueConfigurations = listOf(
|
||||
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
|
||||
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
|
||||
// but these queues are not worth persisting.
|
||||
queueConfig(RPCApi.RPC_SERVER_QUEUE_NAME, durable = false),
|
||||
queueConfig(
|
||||
name = RPCApi.RPC_CLIENT_BINDING_REMOVALS,
|
||||
address = NOTIFICATIONS_ADDRESS,
|
||||
filter = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION,
|
||||
durable = false
|
||||
),
|
||||
queueConfig(
|
||||
name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS,
|
||||
address = NOTIFICATIONS_ADDRESS,
|
||||
filter = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION,
|
||||
durable = false
|
||||
)
|
||||
)
|
||||
addressesSettings = mapOf(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
|
||||
maxSizeBytes = 10L * maxMessageSize
|
||||
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
|
||||
}
|
||||
)
|
||||
|
||||
// JMX enablement
|
||||
if (config.exportJMXto.isNotEmpty()) {
|
||||
isJMXManagementEnabled = true
|
||||
@ -221,16 +179,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
|
||||
}.configureAddressSecurity()
|
||||
|
||||
|
||||
private fun queueConfig(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration {
|
||||
return CoreQueueConfiguration().apply {
|
||||
this.name = name
|
||||
this.address = address
|
||||
filterString = filter
|
||||
isDurable = durable
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticated clients connecting to us fall in one of the following groups:
|
||||
* 1. The node itself. It is given full access to all valid queues.
|
||||
@ -242,24 +190,9 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
|
||||
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
|
||||
securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
|
||||
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
|
||||
// Each RPC user must have its own role and its own queue. This prevents users accessing each other's queues
|
||||
// and stealing RPC responses.
|
||||
val rolesAdderOnLogin = RolesAdderOnLogin { username ->
|
||||
Pair(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#",
|
||||
setOf(
|
||||
nodeInternalRole,
|
||||
restrictedRole(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username",
|
||||
consume = true,
|
||||
createNonDurableQueue = true,
|
||||
deleteNonDurableQueue = true)))
|
||||
}
|
||||
securitySettingPlugins.add(rolesAdderOnLogin)
|
||||
securityRoles[VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, consume = true))
|
||||
securityRoles["${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.#"] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, send = true))
|
||||
val onLoginListener = { username: String -> rolesAdderOnLogin.onLogin(username) }
|
||||
val onLoginListener = { _: String -> }
|
||||
return Pair(this, onLoginListener)
|
||||
}
|
||||
|
||||
@ -369,66 +302,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
||||
}
|
||||
|
||||
sealed class CertificateChainCheckPolicy {
|
||||
|
||||
@FunctionalInterface
|
||||
interface Check {
|
||||
fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>)
|
||||
}
|
||||
|
||||
abstract fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check
|
||||
|
||||
object Any : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object RootMustMatch : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
val rootPublicKey = trustStore.getCertificate(CORDA_ROOT_CA).publicKey
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
|
||||
val theirRoot = theirChain.last().publicKey
|
||||
if (rootPublicKey != theirRoot) {
|
||||
throw CertificateException("Root certificate mismatch, their root = $theirRoot")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object LeafMustMatch : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
val ourPublicKey = keyStore.getCertificate(CORDA_CLIENT_TLS).publicKey
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
|
||||
val theirLeaf = theirChain.first().publicKey
|
||||
if (ourPublicKey != theirLeaf) {
|
||||
throw CertificateException("Leaf certificate mismatch, their leaf = $theirLeaf")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data class MustContainOneOf(val trustedAliases: Set<String>) : CertificateChainCheckPolicy() {
|
||||
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
|
||||
val trustedPublicKeys = trustedAliases.map { trustStore.getCertificate(it).publicKey }.toSet()
|
||||
return object : Check {
|
||||
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
|
||||
if (!theirChain.any { it.publicKey in trustedPublicKeys }) {
|
||||
throw CertificateException("Their certificate chain contained none of the trusted ones")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clients must connect to us with a username and password and must use TLS. If a someone connects with
|
||||
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate
|
||||
@ -445,7 +318,6 @@ class NodeLoginModule : LoginModule {
|
||||
// Include forbidden username character to prevent name clash with any RPC usernames
|
||||
const val PEER_ROLE = "SystemRoles/Peer"
|
||||
const val NODE_ROLE = "SystemRoles/Node"
|
||||
const val RPC_ROLE = "SystemRoles/RPC"
|
||||
const val VERIFIER_ROLE = "SystemRoles/Verifier"
|
||||
|
||||
const val CERT_CHAIN_CHECKS_OPTION_NAME = "CertChainChecks"
|
||||
@ -475,10 +347,9 @@ class NodeLoginModule : LoginModule {
|
||||
|
||||
override fun login(): Boolean {
|
||||
val nameCallback = NameCallback("Username: ")
|
||||
val passwordCallback = PasswordCallback("Password: ", false)
|
||||
val certificateCallback = CertificateCallback()
|
||||
try {
|
||||
callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback))
|
||||
callbackHandler.handle(arrayOf(nameCallback, certificateCallback))
|
||||
} catch (e: IOException) {
|
||||
throw LoginException(e.message)
|
||||
} catch (e: UnsupportedCallbackException) {
|
||||
@ -486,7 +357,6 @@ class NodeLoginModule : LoginModule {
|
||||
}
|
||||
|
||||
val username = nameCallback.name ?: throw FailedLoginException("Username not provided")
|
||||
val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided"))
|
||||
val certificates = certificateCallback.certificates
|
||||
|
||||
log.debug { "Processing login for $username" }
|
||||
@ -496,7 +366,6 @@ class NodeLoginModule : LoginModule {
|
||||
PEER_ROLE -> authenticatePeer(certificates)
|
||||
NODE_ROLE -> authenticateNode(certificates)
|
||||
VERIFIER_ROLE -> authenticateVerifier(certificates)
|
||||
RPC_ROLE -> authenticateRpcUser(username, Password(password))
|
||||
else -> throw FailedLoginException("Peer does not belong on our network")
|
||||
}
|
||||
principals += UserPrincipal(validatedUser)
|
||||
@ -527,14 +396,6 @@ class NodeLoginModule : LoginModule {
|
||||
return certificates.first().subjectDN.name
|
||||
}
|
||||
|
||||
private fun authenticateRpcUser(username: String, password: Password): String {
|
||||
securityManager.authenticate(username, password)
|
||||
loginListener(username)
|
||||
principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests
|
||||
principals += RolePrincipal("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username") // This enables the RPC client to receive responses
|
||||
return username
|
||||
}
|
||||
|
||||
private fun determineUserRole(certificates: Array<javax.security.cert.X509Certificate>?, username: String): String? {
|
||||
fun requireTls() = require(certificates != null) { "No TLS?" }
|
||||
return when (username) {
|
||||
@ -550,14 +411,7 @@ class NodeLoginModule : LoginModule {
|
||||
requireTls()
|
||||
VERIFIER_ROLE
|
||||
}
|
||||
else -> {
|
||||
// Assume they're an RPC user if its from a non-ssl connection
|
||||
if (certificates == null) {
|
||||
RPC_ROLE
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
else -> null
|
||||
}
|
||||
}
|
||||
|
||||
@ -585,39 +439,4 @@ class NodeLoginModule : LoginModule {
|
||||
}
|
||||
}
|
||||
|
||||
typealias LoginListener = (String) -> Unit
|
||||
typealias RolesRepository = HierarchicalRepository<MutableSet<Role>>
|
||||
|
||||
/**
|
||||
* Helper class to dynamically assign security roles to RPC users
|
||||
* on their authentication. This object is plugged into the server
|
||||
* as [SecuritySettingPlugin]. It responds to authentication events
|
||||
* from [NodeLoginModule] by adding the address -> roles association
|
||||
* generated by the given [source], unless already done before.
|
||||
*/
|
||||
private class RolesAdderOnLogin(val source: (String) -> Pair<String, Set<Role>>)
|
||||
: SecuritySettingPlugin {
|
||||
|
||||
// Artemis internal container storing roles association
|
||||
private lateinit var repository: RolesRepository
|
||||
|
||||
fun onLogin(username: String) {
|
||||
val (address, roles) = source(username)
|
||||
val entry = repository.getMatch(address)
|
||||
if (entry == null || entry.isEmpty()) {
|
||||
repository.addMatch(address, roles.toMutableSet())
|
||||
}
|
||||
}
|
||||
|
||||
// Initializer called by the Artemis framework
|
||||
override fun setSecurityRepository(repository: RolesRepository) {
|
||||
this.repository = repository
|
||||
}
|
||||
|
||||
// Part of SecuritySettingPlugin interface which is no-op in this case
|
||||
override fun stop() = this
|
||||
|
||||
override fun init(options: MutableMap<String, String>?) = this
|
||||
|
||||
override fun getSecurityRoles() = null
|
||||
}
|
||||
typealias LoginListener = (String) -> Unit
|
@ -5,14 +5,14 @@ import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.getX509Certificate
|
||||
import net.corda.nodeapi.internal.crypto.loadKeyStore
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
|
||||
class RPCMessagingClient(private val config: SSLConfiguration, serverAddress: NetworkHostAndPort, private val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
class RPCMessagingClient(private val config: SSLConfiguration, serverAddress: NetworkHostAndPort, maxMessageSize: Int) : SingletonSerializeAsToken(), AutoCloseable {
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
|
||||
private var rpcServer: RPCServer? = null
|
||||
|
||||
@ -30,4 +30,6 @@ class RPCMessagingClient(private val config: SSLConfiguration, serverAddress: Ne
|
||||
rpcServer?.close()
|
||||
artemis.stop()
|
||||
}
|
||||
|
||||
override fun close() = stop()
|
||||
}
|
||||
|
@ -0,0 +1,108 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import net.corda.core.internal.noneOrSingle
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.artemis.ArtemisBroker
|
||||
import net.corda.node.internal.artemis.BrokerAddresses
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.config.CertChainPolicyConfig
|
||||
import net.corda.node.internal.artemis.CertificateChainCheckPolicy
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.crypto.loadKeyStore
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
||||
import rx.Observable
|
||||
import java.io.IOException
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyStoreException
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import javax.security.auth.login.AppConfigurationEntry
|
||||
|
||||
internal class ArtemisRpcBroker internal constructor(
|
||||
address: NetworkHostAndPort,
|
||||
private val adminAddressOptional: NetworkHostAndPort?,
|
||||
private val sslOptions: SSLConfiguration,
|
||||
private val useSsl: Boolean,
|
||||
private val securityManager: RPCSecurityManager,
|
||||
private val certificateChainCheckPolicies: List<CertChainPolicyConfig>,
|
||||
private val maxMessageSize: Int,
|
||||
private val jmxEnabled: Boolean = false,
|
||||
private val baseDirectory: Path) : ArtemisBroker {
|
||||
|
||||
companion object {
|
||||
private val logger = loggerFor<ArtemisRpcBroker>()
|
||||
|
||||
fun withSsl(address: NetworkHostAndPort, sslOptions: SSLConfiguration, securityManager: RPCSecurityManager, certificateChainCheckPolicies: List<CertChainPolicyConfig>, maxMessageSize: Int, jmxEnabled: Boolean, baseDirectory: Path): ArtemisBroker {
|
||||
return ArtemisRpcBroker(address, null, sslOptions, true, securityManager, certificateChainCheckPolicies, maxMessageSize, jmxEnabled, baseDirectory)
|
||||
}
|
||||
|
||||
fun withoutSsl(address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, sslOptions: SSLConfiguration, securityManager: RPCSecurityManager, certificateChainCheckPolicies: List<CertChainPolicyConfig>, maxMessageSize: Int, jmxEnabled: Boolean, baseDirectory: Path): ArtemisBroker {
|
||||
return ArtemisRpcBroker(address, adminAddress, sslOptions, false, securityManager, certificateChainCheckPolicies, maxMessageSize, jmxEnabled, baseDirectory)
|
||||
}
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
logger.debug("Artemis RPC broker is starting.")
|
||||
server.start()
|
||||
logger.debug("Artemis RPC broker is started.")
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
logger.debug("Artemis RPC broker is stopping.")
|
||||
server.stop(true)
|
||||
logger.debug("Artemis RPC broker is stopped.")
|
||||
}
|
||||
|
||||
override val started get() = server.isStarted
|
||||
|
||||
override val serverControl: ActiveMQServerControl get() = server.activeMQServerControl
|
||||
|
||||
override val addresses = BrokerAddresses(address, adminAddressOptional ?: address)
|
||||
|
||||
private val server = initialiseServer()
|
||||
|
||||
private fun initialiseServer(): ActiveMQServer {
|
||||
val serverConfiguration = RpcBrokerConfiguration(baseDirectory, maxMessageSize, jmxEnabled, addresses.primary, adminAddressOptional, sslOptions, useSsl)
|
||||
val serverSecurityManager = createArtemisSecurityManager(serverConfiguration.loginListener, sslOptions)
|
||||
|
||||
return ActiveMQServerImpl(serverConfiguration, serverSecurityManager).apply {
|
||||
registerActivationFailureListener { exception -> throw exception }
|
||||
registerPostQueueDeletionCallback { address, qName -> logger.debug("Queue deleted: $qName for $address") }
|
||||
}
|
||||
}
|
||||
|
||||
@Throws(IOException::class, KeyStoreException::class)
|
||||
private fun createArtemisSecurityManager(loginListener: LoginListener, sslOptions: SSLConfiguration): ActiveMQJAASSecurityManager {
|
||||
val keyStore = loadKeyStore(sslOptions.sslKeystore, sslOptions.keyStorePassword)
|
||||
val trustStore = loadKeyStore(sslOptions.trustStoreFile, sslOptions.trustStorePassword)
|
||||
|
||||
val defaultCertPolicies = mapOf(
|
||||
NodeLoginModule.NODE_ROLE to CertificateChainCheckPolicy.LeafMustMatch,
|
||||
NodeLoginModule.RPC_ROLE to CertificateChainCheckPolicy.Any
|
||||
)
|
||||
val certChecks = defaultCertPolicies.mapValues { (role, defaultPolicy) ->
|
||||
val policy = certificateChainCheckPolicies.noneOrSingle { it.role == role }?.certificateChainCheckPolicy ?: defaultPolicy
|
||||
policy.createCheck(keyStore, trustStore)
|
||||
}
|
||||
|
||||
val securityConfig = object : SecurityConfiguration() {
|
||||
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
|
||||
val options = mapOf(
|
||||
NodeLoginModule.LOGIN_LISTENER_ARG to loginListener,
|
||||
NodeLoginModule.SECURITY_MANAGER_ARG to securityManager,
|
||||
NodeLoginModule.USE_SSL_ARG to useSsl,
|
||||
NodeLoginModule.CERT_CHAIN_CHECKS_ARG to certChecks)
|
||||
return arrayOf(AppConfigurationEntry(name, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options))
|
||||
}
|
||||
}
|
||||
return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig)
|
||||
}
|
||||
}
|
||||
|
||||
typealias LoginListener = (String) -> Unit
|
||||
|
||||
private fun <RESULT> CompletableFuture<RESULT>.toObservable() = Observable.from(this)
|
@ -0,0 +1,169 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.security.Password
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.internal.artemis.CertificateChainCheckPolicy
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
|
||||
import java.io.IOException
|
||||
import java.security.Principal
|
||||
import java.util.*
|
||||
import javax.security.auth.Subject
|
||||
import javax.security.auth.callback.CallbackHandler
|
||||
import javax.security.auth.callback.NameCallback
|
||||
import javax.security.auth.callback.PasswordCallback
|
||||
import javax.security.auth.callback.UnsupportedCallbackException
|
||||
import javax.security.auth.login.FailedLoginException
|
||||
import javax.security.auth.login.LoginException
|
||||
import javax.security.auth.spi.LoginModule
|
||||
import javax.security.cert.X509Certificate
|
||||
|
||||
/**
|
||||
* Clients must connect to us with a username and password and must use TLS. If a someone connects with
|
||||
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate
|
||||
* is the same as our one in our key store. Then they're given full access to all valid queues. If they connect with
|
||||
* [ArtemisMessagingComponent.PEER_USER] then we confirm they belong on our P2P network by checking their root CA is
|
||||
* the same as our root CA. If that's the case the only access they're given is the ablility send to our P2P address.
|
||||
* In both cases the messages these authenticated nodes send to us are tagged with their subject DN and we assume
|
||||
* the CN within that is their legal name.
|
||||
* Otherwise if the username is neither of the above we assume it's an RPC user and authenticate against our list of
|
||||
* valid RPC users. RPC clients are given permission to perform RPC and nothing else.
|
||||
*/
|
||||
internal class NodeLoginModule : LoginModule {
|
||||
companion object {
|
||||
internal const val NODE_ROLE = "SystemRoles/Node"
|
||||
internal const val RPC_ROLE = "SystemRoles/RPC"
|
||||
|
||||
internal const val CERT_CHAIN_CHECKS_ARG = "CertChainChecks"
|
||||
internal const val USE_SSL_ARG = "useSsl"
|
||||
internal val SECURITY_MANAGER_ARG = "RpcSecurityManager"
|
||||
internal val LOGIN_LISTENER_ARG = "LoginListener"
|
||||
private val log = loggerFor<NodeLoginModule>()
|
||||
}
|
||||
|
||||
private var loginSucceeded: Boolean = false
|
||||
private lateinit var subject: Subject
|
||||
private lateinit var callbackHandler: CallbackHandler
|
||||
private lateinit var securityManager: RPCSecurityManager
|
||||
private lateinit var loginListener: LoginListener
|
||||
private var useSsl: Boolean? = null
|
||||
private lateinit var nodeCertCheck: CertificateChainCheckPolicy.Check
|
||||
private lateinit var rpcCertCheck: CertificateChainCheckPolicy.Check
|
||||
private val principals = ArrayList<Principal>()
|
||||
|
||||
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
|
||||
this.subject = subject
|
||||
this.callbackHandler = callbackHandler
|
||||
securityManager = uncheckedCast(options[SECURITY_MANAGER_ARG])
|
||||
loginListener = uncheckedCast(options[LOGIN_LISTENER_ARG])
|
||||
useSsl = options[USE_SSL_ARG] as Boolean
|
||||
val certChainChecks: Map<String, CertificateChainCheckPolicy.Check> = uncheckedCast(options[CERT_CHAIN_CHECKS_ARG])
|
||||
nodeCertCheck = certChainChecks[NODE_ROLE]!!
|
||||
rpcCertCheck = certChainChecks[RPC_ROLE]!!
|
||||
}
|
||||
|
||||
override fun login(): Boolean {
|
||||
val nameCallback = NameCallback("Username: ")
|
||||
val passwordCallback = PasswordCallback("Password: ", false)
|
||||
val certificateCallback = CertificateCallback()
|
||||
try {
|
||||
callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback))
|
||||
} catch (e: IOException) {
|
||||
throw LoginException(e.message)
|
||||
} catch (e: UnsupportedCallbackException) {
|
||||
throw LoginException("${e.message} not available to obtain information from user")
|
||||
}
|
||||
|
||||
val username = nameCallback.name ?: throw FailedLoginException("Username not provided")
|
||||
val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided"))
|
||||
val certificates = certificateCallback.certificates ?: emptyArray()
|
||||
|
||||
if (rpcCertCheck is CertificateChainCheckPolicy.UsernameMustMatchCommonNameCheck) {
|
||||
(rpcCertCheck as CertificateChainCheckPolicy.UsernameMustMatchCommonNameCheck).username = username
|
||||
}
|
||||
|
||||
log.debug("Logging user in")
|
||||
|
||||
try {
|
||||
val role = determineUserRole(certificates, username, useSsl!!)
|
||||
val validatedUser = when (role) {
|
||||
NodeLoginModule.NODE_ROLE -> {
|
||||
authenticateNode(certificates)
|
||||
NODE_USER
|
||||
}
|
||||
RPC_ROLE -> {
|
||||
authenticateRpcUser(username, Password(password), certificates, useSsl!!)
|
||||
username
|
||||
}
|
||||
else -> throw FailedLoginException("Peer does not belong on our network")
|
||||
}
|
||||
principals += UserPrincipal(validatedUser)
|
||||
|
||||
loginSucceeded = true
|
||||
return loginSucceeded
|
||||
} catch (exception: FailedLoginException) {
|
||||
log.warn("$exception")
|
||||
throw exception
|
||||
}
|
||||
}
|
||||
|
||||
private fun authenticateNode(certificates: Array<X509Certificate>) {
|
||||
nodeCertCheck.checkCertificateChain(certificates)
|
||||
principals += RolePrincipal(NodeLoginModule.NODE_ROLE)
|
||||
}
|
||||
|
||||
private fun authenticateRpcUser(username: String, password: Password, certificates: Array<X509Certificate>, useSsl: Boolean) {
|
||||
if (useSsl) {
|
||||
rpcCertCheck.checkCertificateChain(certificates)
|
||||
// no point in matching username with CN because companies wouldn't want to provide a certificate for each user
|
||||
}
|
||||
securityManager.authenticate(username, password)
|
||||
loginListener(username)
|
||||
principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests
|
||||
principals += RolePrincipal("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username") // This enables the RPC client to receive responses
|
||||
}
|
||||
|
||||
private fun determineUserRole(certificates: Array<X509Certificate>, username: String, useSsl: Boolean): String? {
|
||||
return when (username) {
|
||||
ArtemisMessagingComponent.NODE_USER -> {
|
||||
require(certificates.isNotEmpty()) { "No TLS?" }
|
||||
NodeLoginModule.NODE_ROLE
|
||||
}
|
||||
else -> {
|
||||
if (useSsl) {
|
||||
require(certificates.isNotEmpty()) { "No TLS?" }
|
||||
}
|
||||
return RPC_ROLE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun commit(): Boolean {
|
||||
val result = loginSucceeded
|
||||
if (result) {
|
||||
subject.principals.addAll(principals)
|
||||
}
|
||||
clear()
|
||||
return result
|
||||
}
|
||||
|
||||
override fun abort(): Boolean {
|
||||
clear()
|
||||
return true
|
||||
}
|
||||
|
||||
override fun logout(): Boolean {
|
||||
subject.principals.removeAll(principals)
|
||||
return true
|
||||
}
|
||||
|
||||
private fun clear() {
|
||||
loginSucceeded = false
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import org.apache.activemq.artemis.core.security.Role
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository
|
||||
|
||||
/**
|
||||
* Helper class to dynamically assign security roles to RPC users
|
||||
* on their authentication. This object is plugged into the server
|
||||
* as [SecuritySettingPlugin]. It responds to authentication events
|
||||
* from [NodeLoginModule] by adding the address -> roles association
|
||||
* generated by the given [source], unless already done before.
|
||||
*/
|
||||
internal class RolesAdderOnLogin(val source: (String) -> Pair<String, Set<Role>>) : SecuritySettingPlugin {
|
||||
private lateinit var repository: RolesRepository
|
||||
|
||||
fun onLogin(username: String) {
|
||||
val (address, roles) = source(username)
|
||||
val entry = repository.getMatch(address)
|
||||
if (entry == null || entry.isEmpty()) {
|
||||
repository.addMatch(address, roles.toMutableSet())
|
||||
}
|
||||
}
|
||||
|
||||
override fun setSecurityRepository(repository: RolesRepository) {
|
||||
this.repository = repository
|
||||
}
|
||||
|
||||
override fun stop() = this
|
||||
|
||||
override fun init(options: MutableMap<String, String>?) = this
|
||||
|
||||
override fun getSecurityRoles() = null
|
||||
}
|
||||
|
||||
typealias RolesRepository = HierarchicalRepository<MutableSet<Role>>
|
@ -0,0 +1,131 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.internal.artemis.SecureArtemisConfiguration
|
||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.security.Role
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||
import java.nio.file.Path
|
||||
|
||||
internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, jmxEnabled: Boolean, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort?, sslOptions: SSLConfiguration?, useSsl: Boolean) : SecureArtemisConfiguration() {
|
||||
val loginListener: (String) -> Unit
|
||||
|
||||
init {
|
||||
setDirectories(baseDirectory)
|
||||
|
||||
val acceptorConfigurationsSet = mutableSetOf(acceptorConfiguration(address, useSsl, sslOptions))
|
||||
adminAddress?.let {
|
||||
acceptorConfigurationsSet += acceptorConfiguration(adminAddress, true, sslOptions)
|
||||
}
|
||||
acceptorConfigurations = acceptorConfigurationsSet
|
||||
|
||||
queueConfigurations = queueConfigurations()
|
||||
|
||||
managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS)
|
||||
addressesSettings = mapOf(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
|
||||
maxSizeBytes = 10L * maxMessageSize
|
||||
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
|
||||
}
|
||||
)
|
||||
|
||||
initialiseSettings(maxMessageSize)
|
||||
|
||||
val nodeInternalRole = Role(NodeLoginModule.NODE_ROLE, true, true, true, true, true, true, true, true)
|
||||
|
||||
val rolesAdderOnLogin = RolesAdderOnLogin { username ->
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#" to setOf(nodeInternalRole, restrictedRole(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username",
|
||||
consume = true,
|
||||
createNonDurableQueue = true,
|
||||
deleteNonDurableQueue = true)
|
||||
)
|
||||
}
|
||||
|
||||
configureAddressSecurity(nodeInternalRole, rolesAdderOnLogin)
|
||||
|
||||
if (jmxEnabled) {
|
||||
enableJmx()
|
||||
}
|
||||
|
||||
loginListener = { username: String -> rolesAdderOnLogin.onLogin(username) }
|
||||
}
|
||||
|
||||
private fun configureAddressSecurity(nodeInternalRole: Role, rolesAdderOnLogin: RolesAdderOnLogin) {
|
||||
securityRoles["${ArtemisMessagingComponent.INTERNAL_PREFIX}#"] = setOf(nodeInternalRole)
|
||||
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(NodeLoginModule.RPC_ROLE, send = true))
|
||||
securitySettingPlugins.add(rolesAdderOnLogin)
|
||||
}
|
||||
|
||||
private fun enableJmx() {
|
||||
isJMXManagementEnabled = true
|
||||
isJMXUseBrokerName = true
|
||||
}
|
||||
|
||||
private fun initialiseSettings(maxMessageSize: Int) {
|
||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
||||
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
|
||||
isPersistIDCache = true
|
||||
isPopulateValidatedUser = true
|
||||
journalBufferSize_NIO = maxMessageSize // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
|
||||
journalBufferSize_AIO = maxMessageSize // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
|
||||
journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10MiB.
|
||||
}
|
||||
|
||||
private fun queueConfigurations(): List<CoreQueueConfiguration> {
|
||||
return listOf(
|
||||
queueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME, durable = false),
|
||||
queueConfiguration(
|
||||
name = RPCApi.RPC_CLIENT_BINDING_REMOVALS,
|
||||
address = ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS,
|
||||
filter = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION,
|
||||
durable = false
|
||||
),
|
||||
queueConfiguration(
|
||||
name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS,
|
||||
address = ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS,
|
||||
filter = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION,
|
||||
durable = false
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private fun setDirectories(baseDirectory: Path) {
|
||||
bindingsDirectory = (baseDirectory / "bindings").toString()
|
||||
journalDirectory = (baseDirectory / "journal").toString()
|
||||
largeMessagesDirectory = (baseDirectory / "large-messages").toString()
|
||||
}
|
||||
|
||||
private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration {
|
||||
val configuration = CoreQueueConfiguration()
|
||||
|
||||
configuration.name = name
|
||||
configuration.address = address
|
||||
configuration.filterString = filter
|
||||
configuration.isDurable = durable
|
||||
|
||||
return configuration
|
||||
}
|
||||
|
||||
|
||||
private fun acceptorConfiguration(address: NetworkHostAndPort, enableSsl: Boolean, sslOptions: SSLConfiguration?): TransportConfiguration {
|
||||
return tcpTransport(ConnectionDirection.Inbound(NettyAcceptorFactory::class.java.name), address, sslOptions, enableSsl)
|
||||
}
|
||||
|
||||
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
|
||||
deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false,
|
||||
deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role {
|
||||
return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue, deleteNonDurableQueue, manage, browse)
|
||||
}
|
||||
}
|
@ -24,4 +24,8 @@ activeMQServer = {
|
||||
maxRetryIntervalMin = 3
|
||||
}
|
||||
}
|
||||
useAMQPBridges = true
|
||||
useAMQPBridges = true
|
||||
rpcSettings = {
|
||||
useSsl = false
|
||||
standAloneBroker = false
|
||||
}
|
@ -1,11 +1,13 @@
|
||||
package net.corda.node.services.config
|
||||
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Test
|
||||
import java.nio.file.Paths
|
||||
import java.util.*
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
@ -27,24 +29,42 @@ class NodeConfigurationImplTest {
|
||||
assertFalse { configDebugOptions(true, DevModeOptions(true)).shouldCheckCheckpoints() }
|
||||
}
|
||||
|
||||
private fun configDebugOptions(devMode: Boolean, devModeOptions: DevModeOptions?) : NodeConfiguration {
|
||||
private fun configDebugOptions(devMode: Boolean, devModeOptions: DevModeOptions?): NodeConfiguration {
|
||||
return testConfiguration.copy(devMode = devMode, devModeOptions = devModeOptions)
|
||||
}
|
||||
|
||||
private val testConfiguration = NodeConfigurationImpl(
|
||||
baseDirectory = Paths.get("."),
|
||||
myLegalName = ALICE_NAME,
|
||||
emailAddress = "",
|
||||
keyStorePassword = "cordacadevpass",
|
||||
trustStorePassword = "trustpass",
|
||||
dataSourceProperties = makeTestDataSourceProperties(ALICE_NAME.organisation),
|
||||
rpcUsers = emptyList(),
|
||||
verifierType = VerifierType.InMemory,
|
||||
p2pAddress = NetworkHostAndPort("localhost", 0),
|
||||
rpcAddress = NetworkHostAndPort("localhost", 1),
|
||||
messagingServerAddress = null,
|
||||
notary = null,
|
||||
certificateChainCheckPolicies = emptyList(),
|
||||
devMode = true,
|
||||
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)))
|
||||
private fun testConfiguration(dataSourceProperties: Properties): NodeConfigurationImpl {
|
||||
return testConfiguration.copy(dataSourceProperties = dataSourceProperties)
|
||||
}
|
||||
|
||||
private val testConfiguration = testNodeConfiguration()
|
||||
|
||||
private fun testNodeConfiguration(): NodeConfigurationImpl {
|
||||
val baseDirectory = Paths.get(".")
|
||||
val keyStorePassword = "cordacadevpass"
|
||||
val trustStorePassword = "trustpass"
|
||||
val rpcSettings = NodeRpcSettings(
|
||||
address = NetworkHostAndPort("localhost", 1),
|
||||
adminAddress = NetworkHostAndPort("localhost", 2),
|
||||
standAloneBroker = false,
|
||||
useSsl = false,
|
||||
ssl = SslOptions(baseDirectory / "certificates", keyStorePassword, trustStorePassword))
|
||||
return NodeConfigurationImpl(
|
||||
baseDirectory = baseDirectory,
|
||||
myLegalName = ALICE_NAME,
|
||||
emailAddress = "",
|
||||
keyStorePassword = keyStorePassword,
|
||||
trustStorePassword = trustStorePassword,
|
||||
dataSourceProperties = makeTestDataSourceProperties(ALICE_NAME.organisation),
|
||||
rpcUsers = emptyList(),
|
||||
verifierType = VerifierType.InMemory,
|
||||
p2pAddress = NetworkHostAndPort("localhost", 0),
|
||||
messagingServerAddress = null,
|
||||
notary = null,
|
||||
certificateChainCheckPolicies = emptyList(),
|
||||
devMode = true,
|
||||
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
|
||||
rpcSettings = rpcSettings
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class ArtemisMessagingTests {
|
||||
class ArtemisMessagingTest {
|
||||
companion object {
|
||||
const val TOPIC = "platform.self"
|
||||
}
|
||||
@ -51,7 +51,6 @@ class ArtemisMessagingTests {
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
|
||||
private val serverPort = freePort()
|
||||
private val rpcPort = freePort()
|
||||
private val identity = generateKeyPair()
|
||||
|
||||
private lateinit var config: NodeConfiguration
|
||||
@ -71,6 +70,7 @@ class ArtemisMessagingTests {
|
||||
doReturn(ALICE_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress
|
||||
doReturn("").whenever(it).exportJMXto
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
|
||||
@ -183,8 +183,8 @@ class ArtemisMessagingTests {
|
||||
}
|
||||
}
|
||||
|
||||
private fun createMessagingServer(local: Int = serverPort, rpc: Int = rpcPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(config, local, rpc, networkMapCache, securityManager, maxMessageSize).apply {
|
||||
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(config, local, networkMapCache, securityManager, maxMessageSize).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingServer = this
|
||||
}
|
@ -0,0 +1,215 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import net.corda.client.rpc.internal.RPCClient
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.internal.artemis.ArtemisBroker
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.services.Permissions.Companion.all
|
||||
import net.corda.node.services.config.CertChainPolicyConfig
|
||||
import net.corda.node.services.messaging.RPCMessagingClient
|
||||
import net.corda.node.testsupport.withCertificates
|
||||
import net.corda.node.testsupport.withKeyStores
|
||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
class ArtemisRpcTests {
|
||||
private val ports: PortAllocation = PortAllocation.RandomFree
|
||||
|
||||
private val user = User("mark", "dadada", setOf(all()))
|
||||
private val users = listOf(user)
|
||||
private val securityManager = RPCSecurityManagerImpl.fromUserList(AuthServiceId("test"), users)
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
|
||||
@Test
|
||||
fun rpc_with_ssl_enabled() {
|
||||
withCertificates { server, client, createSelfSigned, createSignedBy ->
|
||||
val rootCertificate = createSelfSigned(CordaX500Name("SystemUsers/Node", "IT", "R3 London", "London", "London", "GB"))
|
||||
val markCertificate = createSignedBy(CordaX500Name("mark", "IT", "R3 London", "London", "London", "GB"), rootCertificate)
|
||||
|
||||
// truststore needs to contain root CA for how the driver works...
|
||||
server.keyStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["mark"] = markCertificate
|
||||
|
||||
client.keyStore["mark"] = markCertificate
|
||||
client.trustStore["cordaclienttls"] = rootCertificate
|
||||
|
||||
withKeyStores(server, client) { brokerSslOptions, clientSslOptions ->
|
||||
testSslCommunication(brokerSslOptions, true, clientSslOptions)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun rpc_with_ssl_disabled() {
|
||||
withCertificates { server, client, createSelfSigned, _ ->
|
||||
val rootCertificate = createSelfSigned(CordaX500Name("SystemUsers/Node", "IT", "R3 London", "London", "London", "GB"))
|
||||
|
||||
// truststore needs to contain root CA for how the driver works...
|
||||
server.keyStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["cordaclienttls"] = rootCertificate
|
||||
|
||||
withKeyStores(server, client) { brokerSslOptions, _ ->
|
||||
// here server is told not to use SSL, and client sslOptions are null (as in, do not use SSL)
|
||||
testSslCommunication(brokerSslOptions, false, null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun rpc_with_server_certificate_untrusted_to_client() {
|
||||
withCertificates { server, client, createSelfSigned, createSignedBy ->
|
||||
val rootCertificate = createSelfSigned(CordaX500Name("SystemUsers/Node", "IT", "R3 London", "London", "London", "GB"))
|
||||
val markCertificate = createSignedBy(CordaX500Name("mark", "IT", "R3 London", "London", "London", "GB"), rootCertificate)
|
||||
|
||||
// truststore needs to contain root CA for how the driver works...
|
||||
server.keyStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["mark"] = markCertificate
|
||||
|
||||
client.keyStore["mark"] = markCertificate
|
||||
// here the server certificate is not trusted by the client
|
||||
// client.trustStore["cordaclienttls"] = rootCertificate
|
||||
|
||||
withKeyStores(server, client) { brokerSslOptions, clientSslOptions ->
|
||||
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(ActiveMQNotConnectedException::class))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun rpc_with_no_client_certificate() {
|
||||
withCertificates { server, client, createSelfSigned, createSignedBy ->
|
||||
val rootCertificate = createSelfSigned(CordaX500Name("SystemUsers/Node", "IT", "R3 London", "London", "London", "GB"))
|
||||
val markCertificate = createSignedBy(CordaX500Name("mark", "IT", "R3 London", "London", "London", "GB"), rootCertificate)
|
||||
|
||||
// truststore needs to contain root CA for how the driver works...
|
||||
server.keyStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["mark"] = markCertificate
|
||||
|
||||
// here client keystore is empty
|
||||
// client.keyStore["mark"] = markCertificate
|
||||
client.trustStore["cordaclienttls"] = rootCertificate
|
||||
|
||||
withKeyStores(server, client) { brokerSslOptions, clientSslOptions ->
|
||||
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(ActiveMQNotConnectedException::class))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun rpc_with_no_ssl_on_client_side_and_ssl_on_server_side() {
|
||||
withCertificates { server, client, createSelfSigned, createSignedBy ->
|
||||
val rootCertificate = createSelfSigned(CordaX500Name("SystemUsers/Node", "IT", "R3 London", "London", "London", "GB"))
|
||||
val markCertificate = createSignedBy(CordaX500Name("mark", "IT", "R3 London", "London", "London", "GB"), rootCertificate)
|
||||
|
||||
// truststore needs to contain root CA for how the driver works...
|
||||
server.keyStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["mark"] = markCertificate
|
||||
|
||||
client.keyStore["mark"] = markCertificate
|
||||
client.trustStore["cordaclienttls"] = rootCertificate
|
||||
|
||||
withKeyStores(server, client) { brokerSslOptions, _ ->
|
||||
// here client sslOptions are passed null (as in, do not use SSL)
|
||||
testSslCommunication(brokerSslOptions, true, null, clientConnectionSpy = expectExceptionOfType(ActiveMQConnectionTimedOutException::class))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun rpc_client_certificate_untrusted_to_server() {
|
||||
withCertificates { server, client, createSelfSigned, _ ->
|
||||
val rootCertificate = createSelfSigned(CordaX500Name("SystemUsers/Node", "IT", "R3 London", "London", "London", "GB"))
|
||||
// here client's certificate is self-signed, otherwise Artemis allows the connection (the issuing certificate is in the truststore)
|
||||
val markCertificate = createSelfSigned(CordaX500Name("mark", "IT", "R3 London", "London", "London", "GB"))
|
||||
|
||||
// truststore needs to contain root CA for how the driver works...
|
||||
server.keyStore["cordaclienttls"] = rootCertificate
|
||||
server.trustStore["cordaclienttls"] = rootCertificate
|
||||
// here the client certificate is not trusted by the server
|
||||
// server.trustStore["mark"] = markCertificate
|
||||
|
||||
client.keyStore["mark"] = markCertificate
|
||||
client.trustStore["cordaclienttls"] = rootCertificate
|
||||
|
||||
withKeyStores(server, client) { brokerSslOptions, clientSslOptions ->
|
||||
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(ActiveMQNotConnectedException::class))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun testSslCommunication(brokerSslOptions: SSLConfiguration, useSslForBroker: Boolean, clientSslOptions: SSLConfiguration?, address: NetworkHostAndPort = ports.nextHostAndPort(),
|
||||
adminAddress: NetworkHostAndPort = ports.nextHostAndPort(), baseDirectory: Path = Files.createTempDirectory(null), clientConnectionSpy: (() -> Unit) -> Unit = {}) {
|
||||
val maxMessageSize = 10000
|
||||
val jmxEnabled = false
|
||||
val certificateChainCheckPolicies: List<CertChainPolicyConfig> = listOf()
|
||||
|
||||
val artemisBroker: ArtemisBroker = if (useSslForBroker) {
|
||||
ArtemisRpcBroker.withSsl(address, brokerSslOptions, securityManager, certificateChainCheckPolicies, maxMessageSize, jmxEnabled, baseDirectory)
|
||||
} else {
|
||||
ArtemisRpcBroker.withoutSsl(address, adminAddress, brokerSslOptions, securityManager, certificateChainCheckPolicies, maxMessageSize, jmxEnabled, baseDirectory)
|
||||
}
|
||||
artemisBroker.use { broker ->
|
||||
broker.start()
|
||||
RPCMessagingClient(brokerSslOptions, broker.addresses.admin, maxMessageSize).use { server ->
|
||||
server.start(TestRpcOpsImpl(), securityManager, broker.serverControl)
|
||||
|
||||
val client = RPCClient<TestRpcOps>(tcpTransport(ConnectionDirection.Outbound(), broker.addresses.primary, clientSslOptions))
|
||||
|
||||
clientConnectionSpy {
|
||||
client.start(TestRpcOps::class.java, user.username, user.password).use { connection ->
|
||||
connection.proxy.apply {
|
||||
val greeting = greet("Frodo")
|
||||
assertThat(greeting).isEqualTo("Oh, hello Frodo!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun <OPS : RPCOps> RPCMessagingClient.start(ops: OPS, securityManager: RPCSecurityManager, brokerControl: ActiveMQServerControl) {
|
||||
apply {
|
||||
start(ops, securityManager)
|
||||
start2(brokerControl)
|
||||
}
|
||||
}
|
||||
|
||||
private fun <EXCEPTION : Exception> expectExceptionOfType(exceptionType: KClass<EXCEPTION>): (() -> Unit) -> Unit {
|
||||
return { action -> assertThatThrownBy { action.invoke() }.isInstanceOf(exceptionType.java) }
|
||||
}
|
||||
|
||||
interface TestRpcOps : RPCOps {
|
||||
fun greet(name: String): String
|
||||
}
|
||||
|
||||
class TestRpcOpsImpl : TestRpcOps {
|
||||
override fun greet(name: String): String = "Oh, hello $name!"
|
||||
|
||||
override val protocolVersion: Int = 1
|
||||
}
|
||||
}
|
@ -0,0 +1,206 @@
|
||||
package net.corda.node.testsupport
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.node.services.config.SslOptions
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import org.apache.commons.io.FileUtils
|
||||
import sun.security.tools.keytool.CertAndKeyGen
|
||||
import sun.security.x509.X500Name
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyStore
|
||||
import java.security.PrivateKey
|
||||
import java.security.cert.X509Certificate
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.Instant.now
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.*
|
||||
import javax.security.auth.x500.X500Principal
|
||||
|
||||
class UnsafeCertificatesFactory(
|
||||
defaults: Defaults = defaults(),
|
||||
private val keyType: String = defaults.keyType,
|
||||
private val signatureAlgorithm: String = defaults.signatureAlgorithm,
|
||||
private val keySize: Int = defaults.keySize,
|
||||
private val certificatesValidityWindow: CertificateValidityWindow = defaults.certificatesValidityWindow,
|
||||
private val keyStoreType: String = defaults.keyStoreType) {
|
||||
|
||||
companion object {
|
||||
private const val KEY_TYPE_RSA = "RSA"
|
||||
private const val SIG_ALG_SHA_RSA = "SHA1WithRSA"
|
||||
private const val KEY_SIZE = 1024
|
||||
private val DEFAULT_DURATION = Duration.of(365, ChronoUnit.DAYS)
|
||||
private const val DEFAULT_KEYSTORE_TYPE = "JKS"
|
||||
|
||||
fun defaults() = Defaults(KEY_TYPE_RSA, SIG_ALG_SHA_RSA, KEY_SIZE, CertificateValidityWindow(now(), DEFAULT_DURATION), DEFAULT_KEYSTORE_TYPE)
|
||||
}
|
||||
|
||||
data class Defaults(
|
||||
val keyType: String,
|
||||
val signatureAlgorithm: String,
|
||||
val keySize: Int,
|
||||
val certificatesValidityWindow: CertificateValidityWindow,
|
||||
val keyStoreType: String)
|
||||
|
||||
fun createSelfSigned(name: X500Name): UnsafeCertificate = createSelfSigned(name, keyType, signatureAlgorithm, keySize, certificatesValidityWindow)
|
||||
|
||||
fun createSelfSigned(name: CordaX500Name) = createSelfSigned(name.asX500Name())
|
||||
|
||||
fun createSignedBy(subject: X500Principal, issuer: UnsafeCertificate): UnsafeCertificate = issuer.createSigned(subject, keyType, signatureAlgorithm, keySize, certificatesValidityWindow)
|
||||
|
||||
fun createSignedBy(name: CordaX500Name, issuer: UnsafeCertificate): UnsafeCertificate = issuer.createSigned(name, keyType, signatureAlgorithm, keySize, certificatesValidityWindow)
|
||||
|
||||
fun newKeyStore(password: String) = UnsafeKeyStore.create(keyStoreType, password)
|
||||
|
||||
fun newKeyStores(keyStorePassword: String, trustStorePassword: String): KeyStores = KeyStores(newKeyStore(keyStorePassword), newKeyStore(trustStorePassword))
|
||||
}
|
||||
|
||||
class KeyStores(val keyStore: UnsafeKeyStore, val trustStore: UnsafeKeyStore) {
|
||||
fun save(directory: Path = Files.createTempDirectory(null)): AutoClosableSSLConfiguration {
|
||||
val keyStoreFile = keyStore.toTemporaryFile("sslkeystore", directory = directory)
|
||||
val trustStoreFile = trustStore.toTemporaryFile("truststore", directory = directory)
|
||||
|
||||
val sslConfiguration = sslConfiguration(directory)
|
||||
|
||||
return object : AutoClosableSSLConfiguration {
|
||||
override val value = sslConfiguration
|
||||
|
||||
override fun close() {
|
||||
keyStoreFile.close()
|
||||
trustStoreFile.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun sslConfiguration(directory: Path) = SslOptions(directory, keyStore.password, trustStore.password)
|
||||
}
|
||||
|
||||
interface AutoClosableSSLConfiguration : AutoCloseable {
|
||||
val value: SslOptions
|
||||
}
|
||||
|
||||
typealias KeyStoreEntry = Pair<String, UnsafeCertificate>
|
||||
|
||||
data class UnsafeKeyStore(private val delegate: KeyStore, val password: String) : Iterable<KeyStoreEntry> {
|
||||
companion object {
|
||||
private const val JKS_TYPE = "JKS"
|
||||
|
||||
fun create(type: String, password: String) = UnsafeKeyStore(newKeyStore(type, password), password)
|
||||
|
||||
fun createJKS(password: String) = create(JKS_TYPE, password)
|
||||
}
|
||||
|
||||
operator fun plus(entry: KeyStoreEntry) = set(entry.first, entry.second)
|
||||
|
||||
override fun iterator(): Iterator<Pair<String, UnsafeCertificate>> = delegate.aliases().toList().map { alias -> alias to get(alias) }.iterator()
|
||||
|
||||
operator fun get(alias: String): UnsafeCertificate {
|
||||
return when {
|
||||
delegate.isKeyEntry(alias) -> delegate.getCertificateAndKeyPair(alias, password).unsafe()
|
||||
else -> UnsafeCertificate(delegate.getX509Certificate(alias), null)
|
||||
}
|
||||
}
|
||||
|
||||
operator fun set(alias: String, certificate: UnsafeCertificate) {
|
||||
delegate.setCertificateEntry(alias, certificate.value)
|
||||
delegate.setKeyEntry(alias, certificate.privateKey, password.toCharArray(), arrayOf(certificate.value))
|
||||
}
|
||||
|
||||
fun save(path: Path) = delegate.save(path, password)
|
||||
|
||||
fun toTemporaryFile(fileName: String, fileExtension: String? = delegate.type.toLowerCase(), directory: Path): TemporaryFile {
|
||||
return TemporaryFile("$fileName.$fileExtension", directory).also { save(it.path) }
|
||||
}
|
||||
}
|
||||
|
||||
class TemporaryFile(fileName: String, val directory: Path) : AutoCloseable {
|
||||
private val file = (directory / fileName).toFile()
|
||||
init {
|
||||
file.createNewFile()
|
||||
file.deleteOnExit()
|
||||
}
|
||||
|
||||
val path: Path = file.toPath().toAbsolutePath()
|
||||
|
||||
override fun close() = FileUtils.forceDelete(file)
|
||||
}
|
||||
|
||||
data class UnsafeCertificate(val value: X509Certificate, val privateKey: PrivateKey?) {
|
||||
val keyPair = KeyPair(value.publicKey, privateKey)
|
||||
|
||||
val principal: X500Principal get() = value.subjectX500Principal
|
||||
|
||||
val issuer: X500Principal get() = value.issuerX500Principal
|
||||
|
||||
fun createSigned(subject: X500Principal, keyType: String, signatureAlgorithm: String, keySize: Int, certificatesValidityWindow: CertificateValidityWindow): UnsafeCertificate {
|
||||
val keyGen = keyGen(keyType, signatureAlgorithm, keySize)
|
||||
|
||||
return UnsafeCertificate(X509Utilities.createCertificate(
|
||||
certificateType = CertificateType.TLS,
|
||||
issuer = value.subjectX500Principal,
|
||||
issuerKeyPair = keyPair,
|
||||
validityWindow = certificatesValidityWindow.datePair,
|
||||
subject = subject,
|
||||
subjectPublicKey = keyGen.publicKey
|
||||
), keyGen.privateKey)
|
||||
}
|
||||
|
||||
fun createSigned(name: CordaX500Name, keyType: String, signatureAlgorithm: String, keySize: Int, certificatesValidityWindow: CertificateValidityWindow) = createSigned(name.x500Principal, keyType, signatureAlgorithm, keySize, certificatesValidityWindow)
|
||||
}
|
||||
|
||||
data class CertificateValidityWindow(val from: Instant, val to: Instant) {
|
||||
constructor(from: Instant, duration: Duration) : this(from, from.plus(duration))
|
||||
|
||||
val duration = Duration.between(from, to)!!
|
||||
|
||||
val datePair = Date.from(from) to Date.from(to)
|
||||
}
|
||||
|
||||
private fun createSelfSigned(name: X500Name, keyType: String, signatureAlgorithm: String, keySize: Int, certificatesValidityWindow: CertificateValidityWindow): UnsafeCertificate {
|
||||
val keyGen = keyGen(keyType, signatureAlgorithm, keySize)
|
||||
return UnsafeCertificate(keyGen.getSelfCertificate(name, certificatesValidityWindow.duration.toMillis()), keyGen.privateKey)
|
||||
}
|
||||
|
||||
private fun CordaX500Name.asX500Name(): X500Name = X500Name.asX500Name(x500Principal)
|
||||
|
||||
private fun CertificateAndKeyPair.unsafe() = UnsafeCertificate(certificate, keyPair.private)
|
||||
|
||||
private fun keyGen(keyType: String, signatureAlgorithm: String, keySize: Int): CertAndKeyGen {
|
||||
val keyGen = CertAndKeyGen(keyType, signatureAlgorithm)
|
||||
keyGen.generate(keySize)
|
||||
return keyGen
|
||||
}
|
||||
|
||||
private fun newKeyStore(type: String, password: String): KeyStore {
|
||||
val keyStore = KeyStore.getInstance(type)
|
||||
// Loading creates the store, can't do anything with it until it's loaded
|
||||
keyStore.load(null, password.toCharArray())
|
||||
|
||||
return keyStore
|
||||
}
|
||||
|
||||
fun withKeyStores(server: KeyStores, client: KeyStores, action: (brokerSslOptions: SslOptions, clientSslOptions: SslOptions) -> Unit) {
|
||||
val serverDir = Files.createTempDirectory(null)
|
||||
FileUtils.forceDeleteOnExit(serverDir.toFile())
|
||||
|
||||
val clientDir = Files.createTempDirectory(null)
|
||||
FileUtils.forceDeleteOnExit(clientDir.toFile())
|
||||
|
||||
server.save(serverDir).use { serverSslConfiguration ->
|
||||
client.save(clientDir).use { clientSslConfiguration ->
|
||||
action(serverSslConfiguration.value, clientSslConfiguration.value)
|
||||
}
|
||||
}
|
||||
FileUtils.deleteQuietly(clientDir.toFile())
|
||||
FileUtils.deleteQuietly(serverDir.toFile())
|
||||
}
|
||||
|
||||
fun withCertificates(factoryDefaults: UnsafeCertificatesFactory.Defaults = UnsafeCertificatesFactory.defaults(), action: (server: KeyStores, client: KeyStores, createSelfSigned: (name: CordaX500Name) -> UnsafeCertificate, createSignedBy: (name: CordaX500Name, issuer: UnsafeCertificate) -> UnsafeCertificate) -> Unit) {
|
||||
val factory = UnsafeCertificatesFactory(factoryDefaults)
|
||||
val server = factory.newKeyStores("serverKeyStorePass", "serverTrustKeyStorePass")
|
||||
val client = factory.newKeyStores("clientKeyStorePass", "clientTrustKeyStorePass")
|
||||
action(server, client, factory::createSelfSigned, factory::createSignedBy)
|
||||
}
|
@ -101,7 +101,7 @@ class IRSDemoTest {
|
||||
}
|
||||
|
||||
private fun getFixingDateObservable(config: NodeConfiguration): Observable<LocalDate?> {
|
||||
val client = CordaRPCClient(config.rpcAddress!!)
|
||||
val client = CordaRPCClient(config.rpcOptions.address!!)
|
||||
val proxy = client.start("user", "password").proxy
|
||||
val vaultUpdates = proxy.vaultTrackBy<InterestRateSwap.State>().updates
|
||||
|
||||
|
@ -110,7 +110,7 @@ data class SpringBootDriverDSL(private val driverDSL: DriverDSLImpl) : InternalD
|
||||
arguments = listOf(
|
||||
"--base-directory", handle.configuration.baseDirectory.toString(),
|
||||
"--server.port=${handle.webAddress.port}",
|
||||
"--corda.host=${handle.configuration.rpcAddress}",
|
||||
"--corda.host=${handle.configuration.rpcOptions.address}",
|
||||
"--corda.user=${handle.configuration.rpcUsers.first().username}",
|
||||
"--corda.password=${handle.configuration.rpcUsers.first().password}"
|
||||
),
|
||||
|
@ -40,11 +40,11 @@ class TraderDemoTest {
|
||||
).map { (it.getOrThrow() as NodeHandle.InProcess).node }
|
||||
nodeA.registerInitiatedFlow(BuyerFlow::class.java)
|
||||
val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map {
|
||||
val client = CordaRPCClient(it.internals.configuration.rpcAddress!!)
|
||||
val client = CordaRPCClient(it.internals.configuration.rpcOptions.address!!)
|
||||
client.start(demoUser.username, demoUser.password).proxy
|
||||
}
|
||||
val nodeBankRpc = let {
|
||||
val client = CordaRPCClient(bankNode.internals.configuration.rpcAddress!!)
|
||||
val client = CordaRPCClient(bankNode.internals.configuration.rpcOptions.address!!)
|
||||
client.start(bankUser.username, bankUser.password).proxy
|
||||
}
|
||||
|
||||
|
@ -14,9 +14,10 @@ import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.VerifierType
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.NotarySpec
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.DriverDSLImpl
|
||||
import net.corda.testing.node.internal.genericDriver
|
||||
import net.corda.testing.node.internal.getTimestampAsDirectoryName
|
||||
@ -32,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
||||
data class NotaryHandle(val identity: Party, val validating: Boolean, val nodeHandles: CordaFuture<List<NodeHandle>>)
|
||||
|
||||
@DoNotImplement
|
||||
sealed class NodeHandle {
|
||||
sealed class NodeHandle : AutoCloseable {
|
||||
abstract val nodeInfo: NodeInfo
|
||||
/**
|
||||
* Interface to the node's RPC system. The first RPC user will be used to login if are any, otherwise a default one
|
||||
@ -48,6 +49,11 @@ sealed class NodeHandle {
|
||||
*/
|
||||
abstract fun stop()
|
||||
|
||||
/**
|
||||
* Closes and stops the node.
|
||||
*/
|
||||
override fun close() = stop()
|
||||
|
||||
data class OutOfProcess(
|
||||
override val nodeInfo: NodeInfo,
|
||||
override val rpc: CordaRPCOps,
|
||||
@ -87,7 +93,13 @@ sealed class NodeHandle {
|
||||
}
|
||||
}
|
||||
|
||||
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!)
|
||||
/**
|
||||
* Connects to node through RPC.
|
||||
*
|
||||
* @param sslConfiguration specifies SSL options.
|
||||
*/
|
||||
@JvmOverloads
|
||||
fun rpcClientToNode(sslConfiguration: SSLConfiguration? = null): CordaRPCClient = CordaRPCClient(configuration.rpcOptions.address!!, sslConfiguration = sslConfiguration)
|
||||
}
|
||||
|
||||
data class WebserverHandle(
|
||||
|
@ -145,14 +145,18 @@ class DriverDSLImpl(
|
||||
}
|
||||
|
||||
private fun establishRpc(config: NodeConfig, processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
|
||||
val rpcAddress = config.corda.rpcAddress!!
|
||||
val client = CordaRPCClient(rpcAddress)
|
||||
val rpcAddress = config.corda.rpcOptions.address!!
|
||||
val client = if (config.corda.rpcOptions.useSsl) {
|
||||
CordaRPCClient(rpcAddress, sslConfiguration = config.corda.rpcOptions.sslConfig)
|
||||
} else {
|
||||
CordaRPCClient(rpcAddress)
|
||||
}
|
||||
val connectionFuture = poll(executorService, "RPC connection") {
|
||||
try {
|
||||
config.corda.rpcUsers[0].run { client.start(username, password) }
|
||||
} catch (e: Exception) {
|
||||
if (processDeathFuture.isDone) throw e
|
||||
log.error("Exception $e, Retrying RPC connection at $rpcAddress")
|
||||
log.error("Exception while connecting to RPC, retrying to connect at $rpcAddress", e)
|
||||
null
|
||||
}
|
||||
}
|
||||
@ -202,6 +206,7 @@ class DriverDSLImpl(
|
||||
maximumHeapSize: String = "200m",
|
||||
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()): CordaFuture<NodeHandle> {
|
||||
val rpcAddress = portAllocation.nextHostAndPort()
|
||||
val rpcAdminAddress = portAllocation.nextHostAndPort()
|
||||
val webAddress = portAllocation.nextHostAndPort()
|
||||
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
|
||||
val czUrlConfig = if (compatibilityZone != null) mapOf("compatibilityZoneURL" to compatibilityZone.url.toString()) else emptyMap()
|
||||
@ -211,7 +216,8 @@ class DriverDSLImpl(
|
||||
configOverrides = configOf(
|
||||
"myLegalName" to name.toString(),
|
||||
"p2pAddress" to p2pAddress.toString(),
|
||||
"rpcAddress" to rpcAddress.toString(),
|
||||
"rpcSettings.address" to rpcAddress.toString(),
|
||||
"rpcSettings.adminAddress" to rpcAdminAddress.toString(),
|
||||
"webAddress" to webAddress.toString(),
|
||||
"useTestClock" to useTestClock,
|
||||
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
|
||||
@ -312,7 +318,23 @@ class DriverDSLImpl(
|
||||
private fun startCordformNode(cordform: CordformNode, localNetworkMap: LocalNetworkMap): CordaFuture<NodeHandle> {
|
||||
val name = CordaX500Name.parse(cordform.name)
|
||||
// TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal
|
||||
val rpcAddress = if (cordform.rpcAddress == null) mapOf("rpcAddress" to portAllocation.nextHostAndPort().toString()) else emptyMap()
|
||||
val rpcAddress = if (cordform.rpcAddress == null) {
|
||||
val overrides = mutableMapOf<String, Any>("rpcSettings.address" to portAllocation.nextHostAndPort().toString())
|
||||
cordform.config.apply {
|
||||
if (!hasPath("rpcSettings.useSsl") || !getBoolean("rpcSettings.useSsl")) {
|
||||
overrides += "rpcSettings.adminAddress" to portAllocation.nextHostAndPort().toString()
|
||||
}
|
||||
}
|
||||
overrides
|
||||
} else {
|
||||
val overrides = mutableMapOf<String, Any>()
|
||||
cordform.config.apply {
|
||||
if ((!hasPath("rpcSettings.useSsl") || !getBoolean("rpcSettings.useSsl")) && !hasPath("rpcSettings.adminAddress")) {
|
||||
overrides += "rpcSettings.adminAddress" to portAllocation.nextHostAndPort().toString()
|
||||
}
|
||||
}
|
||||
overrides
|
||||
}
|
||||
val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort()
|
||||
val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap()
|
||||
val rpcUsers = cordform.rpcUsers
|
||||
|
@ -69,7 +69,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
|
||||
val portNotBoundChecks = nodes.flatMap {
|
||||
listOf(
|
||||
it.internals.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) },
|
||||
it.internals.configuration.rpcAddress?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }
|
||||
it.internals.configuration.rpcOptions.address?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }
|
||||
)
|
||||
}.filterNotNull()
|
||||
nodes.clear()
|
||||
@ -85,7 +85,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
|
||||
rpcUsers: List<User> = emptyList(),
|
||||
configOverrides: Map<String, Any> = emptyMap()): StartedNode<Node> {
|
||||
val baseDirectory = baseDirectory(legalName).createDirectories()
|
||||
val localPort = getFreeLocalPorts("localhost", 2)
|
||||
val localPort = getFreeLocalPorts("localhost", 3)
|
||||
val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString()
|
||||
val config = ConfigHelper.loadConfig(
|
||||
baseDirectory = baseDirectory,
|
||||
@ -93,7 +93,8 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
|
||||
configOverrides = configOf(
|
||||
"myLegalName" to legalName.toString(),
|
||||
"p2pAddress" to p2pAddress,
|
||||
"rpcAddress" to localPort[1].toString(),
|
||||
"rpcSettings.address" to localPort[1].toString(),
|
||||
"rpcSettings.adminAddress" to localPort[2].toString(),
|
||||
"rpcUsers" to rpcUsers.map { it.toConfig().root().unwrapped() }
|
||||
) + configOverrides
|
||||
)
|
||||
|
@ -4,7 +4,9 @@ import com.nhaarman.mockito_kotlin.doAnswer
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.Crypto.generateKeyPair
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.SslOptions
|
||||
import net.corda.node.services.config.configureDevKeyAndTrustStores
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.createDevNodeCa
|
||||
@ -115,3 +117,22 @@ fun createDevNodeCaCertPath(
|
||||
|
||||
/** Application of [doAnswer] that gets a value from the given [map] using the arg at [argIndex] as key. */
|
||||
fun doLookup(map: Map<*, *>, argIndex: Int = 0) = doAnswer { map[it.arguments[argIndex]] }
|
||||
|
||||
fun SslOptions.useSslRpcOverrides(): Map<String, String> {
|
||||
return mapOf(
|
||||
"rpcSettings.useSsl" to "true",
|
||||
"rpcSettings.ssl.certificatesDirectory" to certificatesDirectory.toString(),
|
||||
"rpcSettings.ssl.keyStorePassword" to keyStorePassword,
|
||||
"rpcSettings.ssl.trustStorePassword" to trustStorePassword
|
||||
)
|
||||
}
|
||||
|
||||
fun SslOptions.noSslRpcOverrides(rpcAdminAddress: NetworkHostAndPort): Map<String, String> {
|
||||
return mapOf(
|
||||
"rpcSettings.adminAddress" to rpcAdminAddress.toString(),
|
||||
"rpcSettings.useSsl" to "false",
|
||||
"rpcSettings.ssl.certificatesDirectory" to certificatesDirectory.toString(),
|
||||
"rpcSettings.ssl.keyStorePassword" to keyStorePassword,
|
||||
"rpcSettings.ssl.trustStorePassword" to trustStorePassword
|
||||
)
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ class NodeConfigTest {
|
||||
val fullConfig = nodeConfig.parseAsNodeConfiguration()
|
||||
|
||||
assertEquals(myLegalName, fullConfig.myLegalName)
|
||||
assertEquals(localPort(40002), fullConfig.rpcAddress)
|
||||
assertEquals(localPort(40002), fullConfig.rpcOptions.address)
|
||||
assertEquals(localPort(10001), fullConfig.p2pAddress)
|
||||
assertEquals(listOf(user("jenny")), fullConfig.rpcUsers)
|
||||
assertThat(fullConfig.dataSourceProperties["dataSource.url"] as String).contains("AUTO_SERVER_PORT=30001")
|
||||
|
@ -83,7 +83,7 @@ class ExplorerSimulation(private val options: OptionSet) {
|
||||
issuerNodeUSD = issuerUSD.get()
|
||||
|
||||
arrayOf(notaryNode, aliceNode, bobNode, issuerNodeGBP, issuerNodeUSD).forEach {
|
||||
println("${it.nodeInfo.legalIdentities.first()} started on ${it.configuration.rpcAddress}")
|
||||
println("${it.nodeInfo.legalIdentities.first()} started on ${it.configuration.rpcOptions.address}")
|
||||
}
|
||||
|
||||
when {
|
||||
|
@ -2,8 +2,8 @@ package net.corda.webserver
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.config.getValue
|
||||
import java.nio.file.Path
|
||||
|
||||
@ -16,7 +16,15 @@ class WebServerConfig(override val baseDirectory: Path, val config: Config) : No
|
||||
val exportJMXto: String get() = "http"
|
||||
val useHTTPS: Boolean by config
|
||||
val myLegalName: String by config
|
||||
val rpcAddress: NetworkHostAndPort by config
|
||||
val rpcAddress: NetworkHostAndPort by lazy {
|
||||
if (config.hasPath("rpcSettings.address")) {
|
||||
return@lazy NetworkHostAndPort.parse(config.getString("rpcSettings.address"))
|
||||
}
|
||||
if (config.hasPath("rpcAddress")) {
|
||||
return@lazy NetworkHostAndPort.parse(config.getString("rpcAddress"))
|
||||
}
|
||||
throw Exception("Missing rpc address property. Either 'rpcSettings' or 'rpcAddress' must be specified.")
|
||||
}
|
||||
val webAddress: NetworkHostAndPort by config
|
||||
val rpcUsers: List<User> by config
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user