Add node-api, split minimal node functionality, OutOfProcessTransactionVerifierService

This commit is contained in:
Andras Slemmer
2017-03-08 17:21:43 +00:00
committed by exfalso
parent 0280299104
commit 48952dfc02
46 changed files with 1390 additions and 144 deletions

View File

@ -3,7 +3,6 @@ package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.google.common.net.HostAndPort
import net.corda.client.rpc.CordaRPCClientImpl
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.core.crypto.Party
import net.corda.core.crypto.composite
import net.corda.core.crypto.generateKeyPair
@ -23,6 +22,7 @@ import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.testing.configureTestSSL
import net.corda.testing.messaging.SimpleMQClient
import net.corda.testing.node.NodeBasedTest
@ -82,7 +82,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
@Test
fun `create queue for peer which has not been communciated with`() {
fun `create queue for peer which has not been communicated with`() {
val bob = startNode("Bob").getOrThrow()
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.legalIdentity.owningKey.toBase58String()}")
}

View File

@ -5,8 +5,8 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.*
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.core.ThreadBox
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.ThreadBox
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.flatMap
@ -19,7 +19,7 @@ import net.corda.core.utilities.loggerFor
import net.corda.node.LOGS_DIRECTORY_NAME
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.config.VerifierType
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator
@ -68,6 +68,7 @@ interface DriverDSLExposedInterface {
fun startNode(providedName: String? = null,
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
verifierType: VerifierType = VerifierType.InMemory,
customOverrides: Map<String, Any?> = emptyMap()): ListenableFuture<NodeHandle>
/**
@ -83,6 +84,7 @@ interface DriverDSLExposedInterface {
notaryName: String,
clusterSize: Int = 3,
type: ServiceType = RaftValidatingNotaryService.type,
verifierType: VerifierType = VerifierType.InMemory,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
/**
@ -344,7 +346,6 @@ class DriverDSL(
val shutdownManager = ShutdownManager(executorService)
class State {
val clients = LinkedList<NodeMessagingClient>()
val processes = ArrayList<ListenableFuture<Process>>()
}
@ -373,9 +374,6 @@ class DriverDSL(
}
override fun shutdown() {
state.locked {
clients.forEach(NodeMessagingClient::stop)
}
shutdownManager.shutdown()
// Check that we shut down properly
@ -396,8 +394,13 @@ class DriverDSL(
}
}
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>, customOverrides: Map<String, Any?>): ListenableFuture<NodeHandle> {
override fun startNode(
providedName: String?,
advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>
): ListenableFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
@ -422,7 +425,8 @@ class DriverDSL(
"password" to it.password,
"permissions" to it.permissions
)
}
},
"verifierType" to verifierType.name
) + customOverrides
val configuration = FullNodeConfiguration(
@ -450,6 +454,7 @@ class DriverDSL(
notaryName: String,
clusterSize: Int,
type: ServiceType,
verifierType: VerifierType,
rpcUsers: List<User>
): ListenableFuture<Pair<Party, List<NodeHandle>>> {
val nodeNames = (1..clusterSize).map { "Notary Node $it" }
@ -461,12 +466,12 @@ class DriverDSL(
val notaryClusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, verifierType, mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
// All other nodes will join the cluster
val restNotaryFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
val configOverride = mapOf("notaryNodeAddress" to nodeAddress.toString(), "notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))
startNode(it, advertisedService, rpcUsers, configOverride)
startNode(it, advertisedService, rpcUsers, verifierType, configOverride)
}
return firstNotaryFuture.flatMap { firstNotary ->

View File

@ -118,6 +118,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val clock: Clock = platformClock
override val myInfo: NodeInfo get() = info
override val schemaService: SchemaService get() = schemas
override val transactionVerifierService: TransactionVerifierService get() = txVerifierService
// Internal only
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
@ -154,6 +155,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
lateinit var keyManagement: KeyManagementService
var inNodeNetworkMapService: NetworkMapService? = null
var inNodeNotaryService: NotaryService? = null
lateinit var txVerifierService: TransactionVerifierService
lateinit var identity: IdentityService
lateinit var net: MessagingServiceInternal
lateinit var netMapCache: NetworkMapCache
@ -252,6 +254,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
net = makeMessagingService()
schemas = makeSchemaService()
vault = makeVaultService(configuration.dataSourceProperties)
txVerifierService = makeTransactionVerifierService()
info = makeInfo()
identity = makeIdentityService()
@ -478,6 +481,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected open fun makeSchemaService(): SchemaService = NodeSchemaService()
protected abstract fun makeTransactionVerifierService() : TransactionVerifierService
open fun stop() {
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()

View File

@ -56,6 +56,7 @@ class Node(override val configuration: FullNodeConfiguration,
override val log: Logger get() = logger
override val version: Version get() = nodeVersionInfo.version
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
override fun makeTransactionVerifierService() = (net as NodeMessagingClient).verifierService
// DISCUSSION
//
@ -136,7 +137,8 @@ class Node(override val configuration: FullNodeConfiguration,
myIdentityOrNullIfNetworkMapService,
serverThread,
database,
networkMapRegistrationFuture)
networkMapRegistrationFuture,
services.monitoringService)
}
private fun makeLocalMessageBroker(): HostAndPort {

View File

@ -7,15 +7,13 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRenderOptions
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.core.copyTo
import net.corda.core.createDirectories
import net.corda.core.crypto.X509Utilities
import net.corda.core.div
import net.corda.core.exists
import net.corda.core.utilities.loggerFor
import java.net.URL
import java.nio.file.Files
import net.corda.nodeapi.config.SSLConfiguration
import java.nio.file.Path
object ConfigHelper {

View File

@ -2,25 +2,29 @@ package net.corda.node.services.config
import com.google.common.net.HostAndPort
import com.typesafe.config.Config
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.getListOrElse
import net.corda.nodeapi.config.getOrElse
import net.corda.nodeapi.config.getValue
import net.corda.core.div
import net.corda.core.node.NodeVersionInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.internal.Node
import net.corda.node.serialization.NodeClock
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.TestClock
import net.corda.nodeapi.User
import net.corda.nodeapi.config.getListOrElse
import net.corda.nodeapi.config.getOrElse
import net.corda.nodeapi.config.getValue
import java.net.URL
import java.nio.file.Path
import java.util.*
enum class VerifierType {
InMemory,
OutOfProcess
}
interface NodeConfiguration : SSLConfiguration {
interface NodeConfiguration : net.corda.nodeapi.config.SSLConfiguration {
val baseDirectory: Path
override val certificatesDirectory: Path get() = baseDirectory / "certificates"
val myLegalName: String
@ -32,6 +36,8 @@ interface NodeConfiguration : SSLConfiguration {
val rpcUsers: List<User> get() = emptyList()
val devMode: Boolean
val certificateSigningService: URL
val certificateChainCheckPolicies: Map<String, CertificateChainCheckPolicy>
val verifierType: VerifierType
}
/**
@ -61,6 +67,10 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
val permissions = it.getListOrElse<String>("permissions") { emptyList() }.toSet()
User(username, password, permissions)
}
override val certificateChainCheckPolicies = config.getOptionalConfig("certificateChainCheckPolicies")?.run {
entrySet().associateByTo(HashMap(), { it.key }, { parseCertificateChainCheckPolicy(getConfig(it.key)) })
} ?: emptyMap<String, CertificateChainCheckPolicy>()
override val verifierType: VerifierType by config
val useHTTPS: Boolean by config
val p2pAddress: HostAndPort by config
val rpcAddress: HostAndPort? by config
@ -90,4 +100,15 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
}
}
private fun parseCertificateChainCheckPolicy(config: Config): CertificateChainCheckPolicy {
val policy = config.getString("policy")
return when (policy) {
"Any" -> CertificateChainCheckPolicy.Any
"RootMustMatch" -> CertificateChainCheckPolicy.RootMustMatch
"LeafMustMatch" -> CertificateChainCheckPolicy.LeafMustMatch
"MustContainOneOf" -> CertificateChainCheckPolicy.MustContainOneOf(config.getStringList("trustedAliases").toSet())
else -> throw IllegalArgumentException("Invalid certificate chain check policy $policy")
}
}
private fun Config.getOptionalConfig(path: String): Config? = if (hasPath(path)) getConfig(path) else null

View File

@ -22,13 +22,11 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE
import net.corda.nodeapi.*
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.expectedOnDefaultFileSystem
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
@ -51,8 +49,8 @@ import org.bouncycastle.asn1.x500.X500Name
import rx.Subscription
import java.io.IOException
import java.math.BigInteger
import java.security.KeyStore
import java.security.Principal
import java.security.PublicKey
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
@ -67,6 +65,7 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
import javax.security.auth.login.FailedLoginException
import javax.security.auth.login.LoginException
import javax.security.auth.spi.LoginModule
import javax.security.cert.CertificateException
import javax.security.cert.X509Certificate
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
@ -201,6 +200,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* 1. The node itself. It is given full access to all valid queues.
* 2. Peers on the same network as us. These are only given permission to send to our P2P inbound queue.
* 3. RPC users. These are only given sufficient access to perform RPC with us.
* 4. Verifiers. These are given read access to the verification request queue and write access to the response queue.
*/
private fun ConfigurationImpl.configureAddressSecurity() {
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
@ -214,6 +214,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
nodeInternalRole,
restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true))
}
securityRoles[VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, consume = true))
securityRoles["${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.*"] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, send = true))
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
@ -224,9 +226,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
val rootCAPublicKey = X509Utilities
.loadCertificateFromKeyStore(config.trustStoreFile, config.trustStorePassword, CORDA_ROOT_CA)
.publicKey
val ourCertificate = X509Utilities
.loadCertificateFromKeyStore(config.keyStoreFile, config.keyStorePassword, CORDA_CLIENT_CA)
val ourSubjectDN = X500Name(ourCertificate.subjectDN.name)
@ -234,13 +233,22 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
require(ourSubjectDN.commonName == config.myLegalName) {
"Legal name does not match with our subject CN: $ourSubjectDN"
}
val defaultCertPolicies = mapOf(
PEER_ROLE to CertificateChainCheckPolicy.RootMustMatch,
NODE_ROLE to CertificateChainCheckPolicy.LeafMustMatch,
VERIFIER_ROLE to CertificateChainCheckPolicy.RootMustMatch
)
val keyStore = X509Utilities.loadKeyStore(config.keyStoreFile, config.keyStorePassword)
val trustStore = X509Utilities.loadKeyStore(config.trustStoreFile, config.trustStorePassword)
val certChecks = defaultCertPolicies.mapValues {
(config.certificateChainCheckPolicies[it.key] ?: it.value).createCheck(keyStore, trustStore)
}
val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(
RPCUserService::class.java.name to userService,
CORDA_ROOT_CA to rootCAPublicKey,
CORDA_CLIENT_CA to ourCertificate.publicKey)
NodeLoginModule.CERT_CHAIN_CHECKS_OPTION_NAME to certChecks)
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
}
}
@ -448,6 +456,66 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>?,
}
}
sealed class CertificateChainCheckPolicy {
@FunctionalInterface
interface Check {
fun checkCertificateChain(theirChain: Array<X509Certificate>)
}
abstract fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check
object Any : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
return object : Check {
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
}
}
}
}
object RootMustMatch : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
val rootPublicKey = trustStore.getCertificate(CORDA_ROOT_CA).publicKey
return object : Check {
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
val theirRoot = theirChain.last().publicKey
if (rootPublicKey != theirRoot) {
throw CertificateException("Root certificate mismatch, their root = $theirRoot")
}
}
}
}
}
object LeafMustMatch : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
val ourPublicKey = keyStore.getCertificate(CORDA_CLIENT_CA).publicKey
return object : Check {
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
val theirLeaf = theirChain.first().publicKey
if (ourPublicKey != theirLeaf) {
throw CertificateException("Leaf certificate mismatch, their leaf = $theirLeaf")
}
}
}
}
}
class MustContainOneOf(val trustedAliases: Set<String>) : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
val trustedPublicKeys = trustedAliases.map { trustStore.getCertificate(it).publicKey }.toSet()
return object : Check {
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
if (!theirChain.any { it.publicKey in trustedPublicKeys }) {
throw CertificateException("Their certificate chain contained none of the trusted ones")
}
}
}
}
}
}
/**
* Clients must connect to us with a username and password and must use TLS. If a someone connects with
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate
@ -465,6 +533,9 @@ class NodeLoginModule : LoginModule {
const val PEER_ROLE = "SystemRoles/Peer"
const val NODE_ROLE = "SystemRoles/Node"
const val RPC_ROLE = "SystemRoles/RPC"
const val VERIFIER_ROLE = "SystemRoles/Verifier"
const val CERT_CHAIN_CHECKS_OPTION_NAME = "CertChainChecks"
val log = loggerFor<NodeLoginModule>()
}
@ -473,23 +544,26 @@ class NodeLoginModule : LoginModule {
private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler
private lateinit var userService: RPCUserService
private lateinit var ourRootCAPublicKey: PublicKey
private lateinit var ourPublicKey: PublicKey
private lateinit var peerCertCheck: CertificateChainCheckPolicy.Check
private lateinit var nodeCertCheck: CertificateChainCheckPolicy.Check
private lateinit var verifierCertCheck: CertificateChainCheckPolicy.Check
private val principals = ArrayList<Principal>()
@Suppress("UNCHECKED_CAST")
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject
this.callbackHandler = callbackHandler
userService = options[RPCUserService::class.java.name] as RPCUserService
ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey
ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey
val certChainChecks = options[CERT_CHAIN_CHECKS_OPTION_NAME] as Map<String, CertificateChainCheckPolicy.Check>
peerCertCheck = certChainChecks[PEER_ROLE]!!
nodeCertCheck = certChainChecks[NODE_ROLE]!!
verifierCertCheck = certChainChecks[VERIFIER_ROLE]!!
}
override fun login(): Boolean {
val nameCallback = NameCallback("Username: ")
val passwordCallback = PasswordCallback("Password: ", false)
val certificateCallback = CertificateCallback()
try {
callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback))
} catch (e: IOException) {
@ -504,32 +578,38 @@ class NodeLoginModule : LoginModule {
log.info("Processing login for $username")
val validatedUser = when (determineUserRole(certificates, username)) {
PEER_ROLE -> authenticatePeer(certificates)
NODE_ROLE -> authenticateNode(certificates)
RPC_ROLE -> authenticateRpcUser(password, username)
else -> throw FailedLoginException("Peer does not belong on our network")
}
principals += UserPrincipal(validatedUser)
try {
val validatedUser = when (determineUserRole(certificates, username)) {
PEER_ROLE -> authenticatePeer(certificates)
NODE_ROLE -> authenticateNode(certificates)
VERIFIER_ROLE -> authenticateVerifier(certificates)
RPC_ROLE -> authenticateRpcUser(password, username)
else -> throw FailedLoginException("Peer does not belong on our network")
}
principals += UserPrincipal(validatedUser)
loginSucceeded = true
return loginSucceeded
loginSucceeded = true
return loginSucceeded
} catch (exception: FailedLoginException) {
log.warn("$exception")
throw exception
}
}
private fun authenticateNode(certificates: Array<X509Certificate>): String {
val peerCertificate = certificates.first()
if (peerCertificate.publicKey != ourPublicKey) {
throw FailedLoginException("Only the node can login as $NODE_USER")
}
nodeCertCheck.checkCertificateChain(certificates)
principals += RolePrincipal(NODE_ROLE)
return peerCertificate.subjectDN.name
return certificates.first().subjectDN.name
}
private fun authenticateVerifier(certificates: Array<X509Certificate>): String {
verifierCertCheck.checkCertificateChain(certificates)
principals += RolePrincipal(VERIFIER_ROLE)
return certificates.first().subjectDN.name
}
private fun authenticatePeer(certificates: Array<X509Certificate>): String {
val theirRootCAPublicKey = certificates.last().publicKey
if (theirRootCAPublicKey != ourRootCAPublicKey) {
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
}
peerCertCheck.checkCertificateChain(certificates)
principals += RolePrincipal(PEER_ROLE)
return certificates.first().subjectDN.name
}
@ -547,14 +627,28 @@ class NodeLoginModule : LoginModule {
}
private fun determineUserRole(certificates: Array<X509Certificate>?, username: String): String? {
return if (username == PEER_USER || username == NODE_USER) {
certificates ?: throw FailedLoginException("No TLS?")
if (username == PEER_USER) PEER_ROLE else NODE_ROLE
} else if (certificates == null) {
// Assume they're an RPC user if its from a non-ssl connection
RPC_ROLE
} else {
null
fun requireTls() = require(certificates != null) { "No TLS?" }
return when (username) {
PEER_USER -> {
requireTls()
PEER_ROLE
}
NODE_USER -> {
requireTls()
NODE_ROLE
}
VerifierApi.VERIFIER_USERNAME -> {
requireTls()
VERIFIER_ROLE
}
else -> {
// Assume they're an RPC user if its from a non-ssl connection
if (certificates == null) {
RPC_ROLE
} else {
null
}
}
}
}

View File

@ -7,19 +7,29 @@ import net.corda.core.crypto.CompositeKey
import net.corda.core.messaging.*
import net.corda.core.node.NodeVersionInfo
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.random63BitValue
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.opaque
import net.corda.core.success
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.node.services.RPCUserService
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
import net.corda.node.utilities.*
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.SimpleString
@ -33,6 +43,7 @@ import java.time.Instant
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.ThreadSafe
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
@ -62,7 +73,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val myIdentity: CompositeKey?,
val nodeExecutor: AffinityExecutor,
val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>) : ArtemisMessagingComponent(), MessagingServiceInternal {
val networkMapRegistrationFuture: ListenableFuture<Unit>,
val monitoringService: MonitoringService
) : ArtemisMessagingComponent(), MessagingServiceInternal {
companion object {
private val log = loggerFor<NodeMessagingClient>()
@ -75,6 +88,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private val nodeVersionProperty = SimpleString("node-version")
private val nodeVendorProperty = SimpleString("node-vendor")
private val amqDelay: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
}
private class InnerState {
@ -88,6 +102,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Consumer for inbound client RPC messages.
var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null
var verificationResponseConsumer: ClientConsumer? = null
}
val verifierService = when (config.verifierType) {
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
VerifierType.OutOfProcess -> createOutOfProcessVerifierService()
}
/** A registration to handle messages of different types */
@ -163,6 +182,19 @@ class NodeMessagingClient(override val config: NodeConfiguration,
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE)
rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName)
fun checkVerifierCount() {
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!")
}
}
if (config.verifierType == VerifierType.OutOfProcess) {
createQueueIfAbsent(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)
createQueueIfAbsent(verifierResponseAddress)
verificationResponseConsumer = session.createConsumer(verifierResponseAddress)
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
}
}
}
@ -224,6 +256,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
check(!running) { "run can't be called twice" }
running = true
rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, nodeExecutor)
(verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!)
p2pConsumer!!
}
@ -463,6 +496,23 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private fun createOutOfProcessVerifierService(): TransactionVerifierService {
return object : OutOfProcessTransactionVerifierService(monitoringService) {
override fun sendRequest(nonce: Long, transaction: LedgerTransaction) {
messagingExecutor.fetchFrom {
state.locked {
val message = session!!.createMessage(false)
val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress))
request.writeToClientMessage(message)
producer!!.send(VERIFICATION_REQUESTS_QUEUE_NAME, message)
}
}
}
}
}
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) {
is PartyInfo.Node -> partyInfo.node.address

View File

@ -0,0 +1,18 @@
package net.corda.node.services.transactions
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
import java.util.concurrent.Executors
class InMemoryTransactionVerifierService(numberOfWorkers: Int) : SingletonSerializeAsToken(), TransactionVerifierService {
private val workerPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfWorkers))
override fun verify(transaction: LedgerTransaction): ListenableFuture<*> {
return workerPool.submit {
transaction.verify()
}
}
}

View File

@ -0,0 +1,70 @@
package net.corda.node.services.transactions
import com.codahale.metrics.Gauge
import com.codahale.metrics.Timer
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.random63BitValue
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.MonitoringService
import net.corda.nodeapi.VerifierApi
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import java.util.concurrent.ConcurrentHashMap
abstract class OutOfProcessTransactionVerifierService(
val monitoringService: MonitoringService
) : SingletonSerializeAsToken(), TransactionVerifierService {
companion object {
val log = loggerFor<OutOfProcessTransactionVerifierService>()
}
private data class VerificationHandle(
val transactionId: SecureHash,
val resultFuture: SettableFuture<Unit>,
val durationTimerContext: Timer.Context
)
private val verificationHandles = ConcurrentHashMap<Long, VerificationHandle>()
// Metrics
private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name"
private val durationTimer = monitoringService.metrics.timer(metric("Verification.Duration"))
private val successMeter = monitoringService.metrics.meter(metric("Verification.Success"))
private val failureMeter = monitoringService.metrics.meter(metric("Verification.Failure"))
class VerificationResultForUnknownTransaction(nonce: Long) :
Exception("Verification result arrived for unknown transaction nonce $nonce")
fun start(responseConsumer: ClientConsumer) {
log.info("Starting out of process verification service")
monitoringService.metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size })
responseConsumer.setMessageHandler { message ->
val response = VerifierApi.VerificationResponse.fromClientMessage(message)
val handle = verificationHandles.remove(response.verificationId) ?:
throw VerificationResultForUnknownTransaction(response.verificationId)
handle.durationTimerContext.stop()
val exception = response.exception
if (exception == null) {
successMeter.mark()
handle.resultFuture.set(Unit)
} else {
failureMeter.mark()
handle.resultFuture.setException(exception)
}
}
}
abstract fun sendRequest(nonce: Long, transaction: LedgerTransaction)
override fun verify(transaction: LedgerTransaction): ListenableFuture<*> {
log.info("Verifying ${transaction.id}")
val future = SettableFuture.create<Unit>()
val nonce = random63BitValue()
verificationHandles[nonce] = VerificationHandle(transaction.id, future, durationTimer.time())
sendRequest(nonce, transaction)
return future
}
}

View File

@ -1,5 +1,6 @@
package net.corda.node.utilities
import com.google.common.util.concurrent.ListeningScheduledExecutorService
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.Uninterruptibles
import net.corda.core.utilities.loggerFor
@ -51,13 +52,12 @@ interface AffinityExecutor : Executor {
* tasks in the future and verify code is running on the executor.
*/
open class ServiceAffinityExecutor(threadName: String, numThreads: Int) : AffinityExecutor,
ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue<Runnable>()) {
ScheduledThreadPoolExecutor(numThreads) {
companion object {
val logger = loggerFor<ServiceAffinityExecutor>()
}
private val threads = Collections.synchronizedSet(HashSet<Thread>())
private val uncaughtExceptionHandler = Thread.currentThread().uncaughtExceptionHandler
init {
setThreadFactory(fun(runnable: Runnable): Thread {
@ -77,11 +77,6 @@ interface AffinityExecutor : Executor {
})
}
override fun afterExecute(r: Runnable, t: Throwable?) {
if (t != null)
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
}
override val isOnThread: Boolean get() = Thread.currentThread() in threads
override fun flush() {

View File

@ -14,4 +14,5 @@ devMode = true
certificateSigningService = "https://cordaci-netperm.corda.r3cev.com"
useHTTPS = false
h2port = 0
useTestClock = false
useTestClock = false
verifierType = InMemory

View File

@ -16,6 +16,7 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.persistence.DataVending
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.testing.MOCK_IDENTITY_SERVICE
import net.corda.testing.node.MockNetworkMapCache
import net.corda.testing.node.MockStorageService
@ -32,8 +33,11 @@ open class MockServiceHubInternal(
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(),
val schemas: SchemaService? = NodeSchemaService()
val schemas: SchemaService? = NodeSchemaService(),
val customTransactionVerifierService: TransactionVerifierService? = InMemoryTransactionVerifierService(2)
) : ServiceHubInternal() {
override val transactionVerifierService: TransactionVerifierService
get() = customTransactionVerifierService ?: throw UnsupportedOperationException()
override val vaultService: VaultService
get() = customVault ?: throw UnsupportedOperationException()
override val keyManagementService: KeyManagementService

View File

@ -1,5 +1,6 @@
package net.corda.node.services.messaging
import com.codahale.metrics.MetricRegistry
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
@ -14,6 +15,7 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.utilities.LogHelper
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
@ -226,7 +228,8 @@ class ArtemisMessagingTests {
identity.public.composite,
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
database,
networkMapRegistrationFuture).apply {
networkMapRegistrationFuture,
MonitoringService(MetricRegistry())).apply {
config.configureWithDevSSLCertificate()
messagingClient = this
}

View File

@ -82,18 +82,4 @@ class AffinityExecutorTests {
latch.countDown()
executor.flush()
}
@Test fun `exceptions are reported to the specified handler`() {
val exception = AtomicReference<Throwable?>()
// Run in a separate thread to avoid messing with any default exception handlers in the unit test thread.
thread {
Thread.currentThread().setUncaughtExceptionHandler { thread, throwable -> exception.set(throwable) }
_executor = AffinityExecutor.ServiceAffinityExecutor("test3", 1)
executor.execute {
throw Exception("foo")
}
executor.flush()
}.join()
assertEquals("foo", exception.get()?.message)
}
}