Merge pull request #340 from corda/aslemmer-verifier-split

Verifier split
This commit is contained in:
Andras Slemmer 2017-03-27 18:21:18 +01:00 committed by GitHub
commit 259bdf4261
48 changed files with 1457 additions and 239 deletions

View File

@ -1,7 +1,6 @@
package net.corda.client.rpc package net.corda.client.rpc
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.core.ThreadBox import net.corda.core.ThreadBox
import net.corda.core.logElapsedTime import net.corda.core.logElapsedTime
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
@ -12,6 +11,7 @@ import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.RPCException import net.corda.nodeapi.RPCException
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.rpcLog import net.corda.nodeapi.rpcLog
import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ActiveMQClient

View File

@ -102,7 +102,8 @@ fun <T> ListenableFuture<T>.failure(executor: Executor, body: (Throwable) -> Uni
infix fun <T> ListenableFuture<T>.then(body: () -> Unit): ListenableFuture<T> = apply { then(RunOnCallerThread, body) } infix fun <T> ListenableFuture<T>.then(body: () -> Unit): ListenableFuture<T> = apply { then(RunOnCallerThread, body) }
infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T> = apply { success(RunOnCallerThread, body) } infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T> = apply { success(RunOnCallerThread, body) }
infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) } infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) }
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, Function { mapper(it!!) }) @Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe.
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, Function { (mapper as (F?) -> T)(it) })
infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) } infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) }
/** Executes the given block and sets the future to either the result, or any exception that was thrown. */ /** Executes the given block and sets the future to either the result, or any exception that was thrown. */
inline fun <T> SettableFuture<T>.catch(block: () -> T) { inline fun <T> SettableFuture<T>.catch(block: () -> T) {

View File

@ -43,6 +43,7 @@ interface ServiceHub : ServicesForResolution {
override val storageService: StorageService override val storageService: StorageService
val networkMapCache: NetworkMapCache val networkMapCache: NetworkMapCache
val schedulerService: SchedulerService val schedulerService: SchedulerService
val transactionVerifierService: TransactionVerifierService
val clock: Clock val clock: Clock
val myInfo: NodeInfo val myInfo: NodeInfo

View File

@ -7,6 +7,7 @@ import net.corda.core.crypto.*
import net.corda.core.flows.FlowException import net.corda.core.flows.FlowException
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.toFuture import net.corda.core.toFuture
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction import net.corda.core.transactions.WireTransaction
import rx.Observable import rx.Observable
@ -366,3 +367,14 @@ interface SchedulerService {
/** Unschedule all activity for a TX output, probably because it was consumed. */ /** Unschedule all activity for a TX output, probably because it was consumed. */
fun unscheduleStateActivity(ref: StateRef) fun unscheduleStateActivity(ref: StateRef)
} }
/**
* Provides verification service. The implementation may be a simple in-memory verify() call or perhaps an IPC/RPC.
*/
interface TransactionVerifierService {
/**
* @param transaction The transaction to be verified.
* @return A future that completes successfully if the transaction verified, or sets an exception the verifier threw.
*/
fun verify(transaction: LedgerTransaction): ListenableFuture<*>
}

View File

@ -72,16 +72,36 @@ class WireTransaction(
*/ */
@Throws(AttachmentResolutionException::class, TransactionResolutionException::class) @Throws(AttachmentResolutionException::class, TransactionResolutionException::class)
fun toLedgerTransaction(services: ServicesForResolution): LedgerTransaction { fun toLedgerTransaction(services: ServicesForResolution): LedgerTransaction {
return toLedgerTransaction(
resolveIdentity = { services.identityService.partyFromKey(it) },
resolveAttachment = { services.storageService.attachments.openAttachment(it) },
resolveStateRef = { services.loadState(it) }
)
}
/**
* Looks up identities, attachments and dependent input states using the provided lookup functions in order to
* construct a [LedgerTransaction]. Note that identity lookup failure does *not* cause an exception to be thrown.
*
* @throws AttachmentResolutionException if a required attachment was not found using [resolveAttachment].
* @throws TransactionResolutionException if an input was not found not using [resolveStateRef].
*/
@Throws(AttachmentResolutionException::class, TransactionResolutionException::class)
fun toLedgerTransaction(
resolveIdentity: (CompositeKey) -> Party?,
resolveAttachment: (SecureHash) -> Attachment?,
resolveStateRef: (StateRef) -> TransactionState<*>?
): LedgerTransaction {
// Look up public keys to authenticated identities. This is just a stub placeholder and will all change in future. // Look up public keys to authenticated identities. This is just a stub placeholder and will all change in future.
val authenticatedArgs = commands.map { val authenticatedArgs = commands.map {
val parties = it.signers.mapNotNull { pk -> services.identityService.partyFromKey(pk) } val parties = it.signers.mapNotNull { pk -> resolveIdentity(pk) }
AuthenticatedObject(it.signers, parties, it.value) AuthenticatedObject(it.signers, parties, it.value)
} }
// Open attachments specified in this transaction. If we haven't downloaded them, we fail. // Open attachments specified in this transaction. If we haven't downloaded them, we fail.
val attachments = attachments.map { val attachments = attachments.map { resolveAttachment(it) ?: throw AttachmentResolutionException(it) }
services.storageService.attachments.openAttachment(it) ?: throw AttachmentResolutionException(it) val resolvedInputs = inputs.map { ref ->
resolveStateRef(ref)?.let { StateAndRef(it, ref) } ?: throw TransactionResolutionException(ref.txhash)
} }
val resolvedInputs = inputs.map { StateAndRef(services.loadState(it), it) }
return LedgerTransaction(resolvedInputs, outputs, authenticatedArgs, attachments, id, notary, mustSign, timestamp, type) return LedgerTransaction(resolvedInputs, outputs, authenticatedArgs, attachments, id, notary, mustSign, timestamp, type)
} }

View File

@ -0,0 +1,47 @@
package net.corda.core.utilities
import java.nio.file.Path
object ProcessUtilities {
inline fun <reified C : Any> startJavaProcess(
arguments: List<String>,
jdwpPort: Int? = null,
extraJvmArguments: List<String> = emptyList(),
inheritIO: Boolean = true,
errorLogPath: Path? = null,
workingDirectory: Path? = null
): Process {
return startJavaProcess(C::class.java.name, arguments, jdwpPort, extraJvmArguments, inheritIO, errorLogPath, workingDirectory)
}
fun startJavaProcess(
className: String,
arguments: List<String>,
jdwpPort: Int? = null,
extraJvmArguments: List<String> = emptyList(),
inheritIO: Boolean = true,
errorLogPath: Path? = null,
workingDirectory: Path? = null
): Process {
val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path")
val javaPath = System.getProperty("java.home") + separator + "bin" + separator + "java"
val debugPortArgument = if (jdwpPort == null) {
listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$jdwpPort")
} else {
emptyList()
}
val allArguments = listOf(javaPath) +
debugPortArgument +
listOf("-Xmx200m", "-XX:+UseG1GC") +
extraJvmArguments +
listOf("-cp", classpath, className) +
arguments.toList()
return ProcessBuilder(allArguments).apply {
if (errorLogPath != null) redirectError(errorLogPath.toFile())
if (inheritIO) inheritIO()
if (workingDirectory != null) directory(workingDirectory.toFile())
}.start()
}
}

View File

@ -5,6 +5,7 @@ import net.corda.core.checkedAdd
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.node.recordTransactions import net.corda.core.node.recordTransactions
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.LedgerTransaction
@ -107,7 +108,9 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
for (stx in newTxns) { for (stx in newTxns) {
// Resolve to a LedgerTransaction and then run all contracts. // Resolve to a LedgerTransaction and then run all contracts.
val ltx = stx.toLedgerTransaction(serviceHub) val ltx = stx.toLedgerTransaction(serviceHub)
ltx.verify() // Block on each verification request.
// TODO We could recover some parallelism from the dependency graph.
serviceHub.transactionVerifierService.verify(ltx).getOrThrow()
serviceHub.recordTransactions(stx) serviceHub.recordTransactions(stx)
result += ltx result += ltx
} }

View File

@ -0,0 +1,9 @@
myLegalName : "Bank A"
nearestCity : "London"
p2pAddress : "my-corda-node:10002"
webAddress : "localhost:10003"
networkMapService : {
address : "my-network-map:10000"
legalName : "Network Map Service"
}
verifierType: "OutOfProcess"

View File

@ -0,0 +1,3 @@
nodeHostAndPort: "my-corda-node:10002"
keyStorePassword : "cordacadevpass"
trustStorePassword : "trustpass"

View File

@ -0,0 +1,52 @@
package net.corda.docs
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.verifier.Verifier
import org.junit.Test
import java.nio.file.Path
import java.nio.file.Paths
import kotlin.reflect.declaredMemberProperties
class ExampleConfigTest {
private fun <A : Any> readAndCheckConfigurations(vararg configFilenames: String, loadConfig: (Path) -> A) {
configFilenames.forEach {
println("Checking $it")
val configFileResource = ExampleConfigTest::class.java.classLoader.getResource(it)
val config = loadConfig(Paths.get(configFileResource.toURI()))
// Force the config fields as they are resolved lazily
config.javaClass.kotlin.declaredMemberProperties.forEach { member ->
member.get(config)
}
}
}
@Test
fun `example node_confs parses fine`() {
readAndCheckConfigurations(
"example-node.conf",
"example-out-of-process-verifier-node.conf",
"example-network-map-node.conf"
) {
val baseDirectory = Paths.get("some-example-base-dir")
FullNodeConfiguration(
baseDirectory,
ConfigHelper.loadConfig(
baseDirectory = baseDirectory,
configFile = it
)
)
}
}
@Test
fun `example verifier_conf parses fine`() {
readAndCheckConfigurations(
"example-verifier.conf"
) {
val baseDirectory = Paths.get("some-example-base-dir")
Verifier.loadConfiguration(baseDirectory, it)
}
}
}

View File

@ -1,34 +0,0 @@
package net.corda.docs
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import org.junit.Test
import java.nio.file.Paths
import kotlin.reflect.declaredMemberProperties
class ExampleNodeConfTest {
@Test
fun exampleNodeConfParsesFine() {
val exampleNodeConfFilenames = arrayOf(
"example-node.conf",
"example-network-map-node.conf"
)
exampleNodeConfFilenames.forEach {
println("Checking $it")
val configResource = ExampleNodeConfTest::class.java.classLoader.getResource(it)
val baseDirectory = Paths.get("some-example-base-dir")
val nodeConfig = FullNodeConfiguration(
baseDirectory,
ConfigHelper.loadConfig(
baseDirectory = baseDirectory,
configFile = Paths.get(configResource.toURI())
)
)
// Force the config fields as they are resolved lazily
nodeConfig.javaClass.kotlin.declaredMemberProperties.forEach { member ->
member.get(nodeConfig)
}
}
}
}

View File

@ -134,6 +134,7 @@ Documentation Contents:
further-notes-on-kotlin further-notes-on-kotlin
publishing-corda publishing-corda
azure-vm azure-vm
out-of-process-verification
.. toctree:: .. toctree::
:maxdepth: 2 :maxdepth: 2

View File

@ -0,0 +1,26 @@
Out of process verification
===========================
A Corda node does transaction verification through ``ServiceHub.transactionVerifierService``. This is by default an
``InMemoryTransactionVerifierService`` which just verifies transactions in-process.
Corda may be configured to use out of process verification. Any number of verifiers may be started connecting to a node
through the node's exposed artemis SSL port. The messaging layer takes care of load balancing.
.. note:: We plan to introduce kernel level sandboxing around the out of process verifiers as an additional line of
defence in case of inner sandbox escapes.
To configure a node to use out of process verification specify the ``verifierType`` option in your node.conf:
.. literalinclude:: example-code/src/main/resources/example-out-of-process-verifier-node.conf
:language: cfg
You can build a verifier jar using ``./gradlew verifier:standaloneJar``.
And run it with ``java -jar verifier/build/libs/corda-verifier.jar <PATH_TO_VERIFIER_BASE_DIR>``.
``PATH_TO_VERIFIER_BASE_DIR`` should contain a ``certificates`` folder akin to the one in a node directory, and a
``verifier.conf`` containing the following:
.. literalinclude:: example-code/src/main/resources/example-verifier.conf
:language: cfg

View File

@ -2,7 +2,6 @@ package net.corda.nodeapi
import com.google.common.annotations.VisibleForTesting import com.google.common.annotations.VisibleForTesting
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.CompositeKey
import net.corda.core.messaging.MessageRecipientGroup import net.corda.core.messaging.MessageRecipientGroup
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
@ -10,6 +9,7 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.read import net.corda.core.read
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.nodeapi.config.SSLConfiguration
import java.security.KeyStore import java.security.KeyStore
/** /**

View File

@ -0,0 +1,59 @@
package net.corda.nodeapi
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.LedgerTransaction
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.reader.MessageUtil
object VerifierApi {
val VERIFIER_USERNAME = "SystemUsers/Verifier"
val VERIFICATION_REQUESTS_QUEUE_NAME = "verifier.requests"
val VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX = "verifier.responses"
private val VERIFICATION_ID_FIELD_NAME = "id"
private val RESULT_EXCEPTION_FIELD_NAME = "result-exception"
data class VerificationRequest(
val verificationId: Long,
val transaction: LedgerTransaction,
val responseAddress: SimpleString
) {
companion object {
fun fromClientMessage(message: ClientMessage): VerificationRequest {
return VerificationRequest(
message.getLongProperty(VERIFICATION_ID_FIELD_NAME),
ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }.deserialize(),
MessageUtil.getJMSReplyTo(message)
)
}
}
fun writeToClientMessage(message: ClientMessage) {
message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId)
message.writeBodyBufferBytes(transaction.serialize().bytes)
MessageUtil.setJMSReplyTo(message, responseAddress)
}
}
data class VerificationResponse(
val verificationId: Long,
val exception: Throwable?
) {
companion object {
fun fromClientMessage(message: ClientMessage): VerificationResponse {
return VerificationResponse(
message.getLongProperty(VERIFICATION_ID_FIELD_NAME),
message.getBytesProperty(RESULT_EXCEPTION_FIELD_NAME)?.deserialize()
)
}
}
fun writeToClientMessage(message: ClientMessage) {
message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId)
if (exception != null) {
message.putBytesProperty(RESULT_EXCEPTION_FIELD_NAME, exception.serialize().bytes)
}
}
}
}

View File

@ -3,7 +3,6 @@ package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import net.corda.client.rpc.CordaRPCClientImpl import net.corda.client.rpc.CordaRPCClientImpl
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.composite import net.corda.core.crypto.composite
import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.generateKeyPair
@ -23,6 +22,7 @@ import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.nodeapi.User import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.testing.configureTestSSL import net.corda.testing.configureTestSSL
import net.corda.testing.messaging.SimpleMQClient import net.corda.testing.messaging.SimpleMQClient
import net.corda.testing.node.NodeBasedTest import net.corda.testing.node.NodeBasedTest
@ -82,7 +82,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
} }
@Test @Test
fun `create queue for peer which has not been communciated with`() { fun `create queue for peer which has not been communicated with`() {
val bob = startNode("Bob").getOrThrow() val bob = startNode("Bob").getOrThrow()
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.legalIdentity.owningKey.toBase58String()}") assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.legalIdentity.owningKey.toBase58String()}")
} }

View File

@ -1,12 +1,13 @@
@file:JvmName("Driver") @file:JvmName("Driver")
package net.corda.node.driver package net.corda.node.driver
import co.paralleluniverse.common.util.ProcessUtil
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.* import com.google.common.util.concurrent.*
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions import com.typesafe.config.ConfigRenderOptions
import net.corda.core.ThreadBox
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.core.ThreadBox
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.div import net.corda.core.div
import net.corda.core.flatMap import net.corda.core.flatMap
@ -15,11 +16,12 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.ProcessUtilities
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.node.LOGS_DIRECTORY_NAME import net.corda.node.LOGS_DIRECTORY_NAME
import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.config.VerifierType
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.node.utilities.ServiceIdentityGenerator
@ -68,6 +70,7 @@ interface DriverDSLExposedInterface {
fun startNode(providedName: String? = null, fun startNode(providedName: String? = null,
advertisedServices: Set<ServiceInfo> = emptySet(), advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(), rpcUsers: List<User> = emptyList(),
verifierType: VerifierType = VerifierType.InMemory,
customOverrides: Map<String, Any?> = emptyMap()): ListenableFuture<NodeHandle> customOverrides: Map<String, Any?> = emptyMap()): ListenableFuture<NodeHandle>
/** /**
@ -83,6 +86,7 @@ interface DriverDSLExposedInterface {
notaryName: String, notaryName: String,
clusterSize: Int = 3, clusterSize: Int = 3,
type: ServiceType = RaftValidatingNotaryService.type, type: ServiceType = RaftValidatingNotaryService.type,
verifierType: VerifierType = VerifierType.InMemory,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>> rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
/** /**
@ -344,7 +348,6 @@ class DriverDSL(
val shutdownManager = ShutdownManager(executorService) val shutdownManager = ShutdownManager(executorService)
class State { class State {
val clients = LinkedList<NodeMessagingClient>()
val processes = ArrayList<ListenableFuture<Process>>() val processes = ArrayList<ListenableFuture<Process>>()
} }
@ -373,9 +376,6 @@ class DriverDSL(
} }
override fun shutdown() { override fun shutdown() {
state.locked {
clients.forEach(NodeMessagingClient::stop)
}
shutdownManager.shutdown() shutdownManager.shutdown()
// Check that we shut down properly // Check that we shut down properly
@ -396,8 +396,13 @@ class DriverDSL(
} }
} }
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>, override fun startNode(
rpcUsers: List<User>, customOverrides: Map<String, Any?>): ListenableFuture<NodeHandle> { providedName: String?,
advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>
): ListenableFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort() val p2pAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort() val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort() val webAddress = portAllocation.nextHostAndPort()
@ -422,7 +427,8 @@ class DriverDSL(
"password" to it.password, "password" to it.password,
"permissions" to it.permissions "permissions" to it.permissions
) )
} },
"verifierType" to verifierType.name
) + customOverrides ) + customOverrides
val configuration = FullNodeConfiguration( val configuration = FullNodeConfiguration(
@ -450,6 +456,7 @@ class DriverDSL(
notaryName: String, notaryName: String,
clusterSize: Int, clusterSize: Int,
type: ServiceType, type: ServiceType,
verifierType: VerifierType,
rpcUsers: List<User> rpcUsers: List<User>
): ListenableFuture<Pair<Party, List<NodeHandle>>> { ): ListenableFuture<Pair<Party, List<NodeHandle>>> {
val nodeNames = (1..clusterSize).map { "Notary Node $it" } val nodeNames = (1..clusterSize).map { "Notary Node $it" }
@ -461,12 +468,12 @@ class DriverDSL(
val notaryClusterAddress = portAllocation.nextHostAndPort() val notaryClusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster // Start the first node that will bootstrap the cluster
val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, mapOf("notaryNodeAddress" to notaryClusterAddress.toString())) val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, verifierType, mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
// All other nodes will join the cluster // All other nodes will join the cluster
val restNotaryFutures = nodeNames.drop(1).map { val restNotaryFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort() val nodeAddress = portAllocation.nextHostAndPort()
val configOverride = mapOf("notaryNodeAddress" to nodeAddress.toString(), "notaryClusterAddresses" to listOf(notaryClusterAddress.toString())) val configOverride = mapOf("notaryNodeAddress" to nodeAddress.toString(), "notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))
startNode(it, advertisedService, rpcUsers, configOverride) startNode(it, advertisedService, rpcUsers, verifierType, configOverride)
} }
return firstNotaryFuture.flatMap { firstNotary -> return firstNotaryFuture.flatMap { firstNotary ->
@ -547,78 +554,53 @@ class DriverDSL(
fun <A> pickA(array: Array<A>): A = array[Math.abs(Random().nextInt()) % array.size] fun <A> pickA(array: Array<A>): A = array[Math.abs(Random().nextInt()) % array.size]
private fun startNode( private fun startNode(
executorService: ScheduledExecutorService, executorService: ListeningScheduledExecutorService,
nodeConf: FullNodeConfiguration, nodeConf: FullNodeConfiguration,
quasarJarPath: String, quasarJarPath: String,
debugPort: Int?, debugPort: Int?,
overriddenSystemProperties: Map<String, String> overriddenSystemProperties: Map<String, String>
): ListenableFuture<Process> { ): ListenableFuture<Process> {
return executorService.submit<Process> {
// Write node.conf // Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", nodeConf.config) writeConfig(nodeConf.baseDirectory, "node.conf", nodeConf.config)
val className = "net.corda.node.Corda" // cannot directly get class for this, so just use string
val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path")
val path = System.getProperty("java.home") + separator + "bin" + separator + "java"
val debugPortArg = if (debugPort != null)
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort"
else
""
val systemProperties = mapOf( val systemProperties = mapOf(
"name" to nodeConf.myLegalName, "name" to nodeConf.myLegalName,
"visualvm.display.name" to "Corda" "visualvm.display.name" to "Corda"
) + overriddenSystemProperties ) + overriddenSystemProperties
val extraJvmArguments = systemProperties.map { "-D${it.key}=${it.value}" } +
"-javaagent:$quasarJarPath"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG" val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
val javaArgs = listOf(path) +
systemProperties.map { "-D${it.key}=${it.value}" } + ProcessUtilities.startJavaProcess(
listOf( className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
"-javaagent:$quasarJarPath", arguments = listOf(
debugPortArg,
"-Xmx200m",
"-XX:+UseG1GC",
"-cp", classpath, className,
"--base-directory=${nodeConf.baseDirectory}", "--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel", "--logging-level=$loggingLevel",
"--no-local-shell" "--no-local-shell"
).filter(String::isNotEmpty) ),
val process = ProcessBuilder(javaArgs) extraJvmArguments = extraJvmArguments,
.redirectError((nodeConf.baseDirectory / LOGS_DIRECTORY_NAME / "error.log").toFile()) errorLogPath = nodeConf.baseDirectory / LOGS_DIRECTORY_NAME / "error.log",
.inheritIO() workingDirectory = nodeConf.baseDirectory
.directory(nodeConf.baseDirectory.toFile()) )
.start() }.flatMap { process -> addressMustBeBound(executorService, nodeConf.p2pAddress).map { process } }
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. Needs rethinking.
return addressMustBeBound(executorService, nodeConf.p2pAddress).map { process }
} }
private fun startWebserver( private fun startWebserver(
executorService: ScheduledExecutorService, executorService: ListeningScheduledExecutorService,
nodeConf: FullNodeConfiguration, nodeConf: FullNodeConfiguration,
debugPort: Int?): ListenableFuture<Process> { debugPort: Int?
val className = "net.corda.webserver.WebServer" // cannot directly get class for this, so just use string ): ListenableFuture<Process> {
val separator = System.getProperty("file.separator") return executorService.submit<Process> {
val classpath = System.getProperty("java.class.path") val className = "net.corda.webserver.WebServer"
val path = System.getProperty("java.home") + separator + "bin" + separator + "java" ProcessUtilities.startJavaProcess(
className = className, // cannot directly get class for this, so just use string
val debugPortArg = if (debugPort != null) arguments = listOf("--base-directory", nodeConf.baseDirectory.toString()),
listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort") jdwpPort = debugPort,
else extraJvmArguments = listOf("-Dname=node-${nodeConf.p2pAddress}-webserver"),
emptyList() errorLogPath = Paths.get("error.$className.log")
)
val javaArgs = listOf(path) + }.flatMap { process -> addressMustBeBound(executorService, nodeConf.webAddress).map { process } }
listOf("-Dname=node-${nodeConf.p2pAddress}-webserver") + debugPortArg +
listOf(
"-cp", classpath, className,
"--base-directory", nodeConf.baseDirectory.toString())
val builder = ProcessBuilder(javaArgs)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()
builder.directory(nodeConf.baseDirectory.toFile())
val process = builder.start()
return addressMustBeBound(executorService, nodeConf.webAddress).map { process }
} }
} }
} }

View File

@ -118,6 +118,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val clock: Clock = platformClock override val clock: Clock = platformClock
override val myInfo: NodeInfo get() = info override val myInfo: NodeInfo get() = info
override val schemaService: SchemaService get() = schemas override val schemaService: SchemaService get() = schemas
override val transactionVerifierService: TransactionVerifierService get() = txVerifierService
// Internal only // Internal only
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
@ -154,6 +155,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
lateinit var keyManagement: KeyManagementService lateinit var keyManagement: KeyManagementService
var inNodeNetworkMapService: NetworkMapService? = null var inNodeNetworkMapService: NetworkMapService? = null
var inNodeNotaryService: NotaryService? = null var inNodeNotaryService: NotaryService? = null
lateinit var txVerifierService: TransactionVerifierService
lateinit var identity: IdentityService lateinit var identity: IdentityService
lateinit var net: MessagingServiceInternal lateinit var net: MessagingServiceInternal
lateinit var netMapCache: NetworkMapCache lateinit var netMapCache: NetworkMapCache
@ -252,6 +254,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
net = makeMessagingService() net = makeMessagingService()
schemas = makeSchemaService() schemas = makeSchemaService()
vault = makeVaultService(configuration.dataSourceProperties) vault = makeVaultService(configuration.dataSourceProperties)
txVerifierService = makeTransactionVerifierService()
info = makeInfo() info = makeInfo()
identity = makeIdentityService() identity = makeIdentityService()
@ -478,6 +481,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected open fun makeSchemaService(): SchemaService = NodeSchemaService() protected open fun makeSchemaService(): SchemaService = NodeSchemaService()
protected abstract fun makeTransactionVerifierService() : TransactionVerifierService
open fun stop() { open fun stop() {
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()

View File

@ -56,6 +56,7 @@ class Node(override val configuration: FullNodeConfiguration,
override val log: Logger get() = logger override val log: Logger get() = logger
override val version: Version get() = nodeVersionInfo.version override val version: Version get() = nodeVersionInfo.version
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress) override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
override fun makeTransactionVerifierService() = (net as NodeMessagingClient).verifierService
// DISCUSSION // DISCUSSION
// //
@ -136,7 +137,8 @@ class Node(override val configuration: FullNodeConfiguration,
myIdentityOrNullIfNetworkMapService, myIdentityOrNullIfNetworkMapService,
serverThread, serverThread,
database, database,
networkMapRegistrationFuture) networkMapRegistrationFuture,
services.monitoringService)
} }
private fun makeLocalMessageBroker(): HostAndPort { private fun makeLocalMessageBroker(): HostAndPort {

View File

@ -7,15 +7,13 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRenderOptions import com.typesafe.config.ConfigRenderOptions
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.core.copyTo import net.corda.core.copyTo
import net.corda.core.createDirectories import net.corda.core.createDirectories
import net.corda.core.crypto.X509Utilities import net.corda.core.crypto.X509Utilities
import net.corda.core.div import net.corda.core.div
import net.corda.core.exists import net.corda.core.exists
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import java.net.URL import net.corda.nodeapi.config.SSLConfiguration
import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
object ConfigHelper { object ConfigHelper {

View File

@ -2,25 +2,29 @@ package net.corda.node.services.config
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.typesafe.config.Config import com.typesafe.config.Config
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.getListOrElse
import net.corda.nodeapi.config.getOrElse
import net.corda.nodeapi.config.getValue
import net.corda.core.div import net.corda.core.div
import net.corda.core.node.NodeVersionInfo import net.corda.core.node.NodeVersionInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.node.internal.NetworkMapInfo import net.corda.node.internal.NetworkMapInfo
import net.corda.node.internal.Node import net.corda.node.internal.Node
import net.corda.node.serialization.NodeClock import net.corda.node.serialization.NodeClock
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.TestClock import net.corda.node.utilities.TestClock
import net.corda.nodeapi.User import net.corda.nodeapi.User
import net.corda.nodeapi.config.getListOrElse
import net.corda.nodeapi.config.getOrElse
import net.corda.nodeapi.config.getValue
import java.net.URL import java.net.URL
import java.nio.file.Path import java.nio.file.Path
import java.util.* import java.util.*
enum class VerifierType {
InMemory,
OutOfProcess
}
interface NodeConfiguration : SSLConfiguration { interface NodeConfiguration : net.corda.nodeapi.config.SSLConfiguration {
val baseDirectory: Path val baseDirectory: Path
override val certificatesDirectory: Path get() = baseDirectory / "certificates" override val certificatesDirectory: Path get() = baseDirectory / "certificates"
val myLegalName: String val myLegalName: String
@ -32,6 +36,8 @@ interface NodeConfiguration : SSLConfiguration {
val rpcUsers: List<User> get() = emptyList() val rpcUsers: List<User> get() = emptyList()
val devMode: Boolean val devMode: Boolean
val certificateSigningService: URL val certificateSigningService: URL
val certificateChainCheckPolicies: Map<String, CertificateChainCheckPolicy>
val verifierType: VerifierType
} }
/** /**
@ -61,6 +67,10 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
val permissions = it.getListOrElse<String>("permissions") { emptyList() }.toSet() val permissions = it.getListOrElse<String>("permissions") { emptyList() }.toSet()
User(username, password, permissions) User(username, password, permissions)
} }
override val certificateChainCheckPolicies = config.getOptionalConfig("certificateChainCheckPolicies")?.run {
entrySet().associateByTo(HashMap(), { it.key }, { parseCertificateChainCheckPolicy(getConfig(it.key)) })
} ?: emptyMap<String, CertificateChainCheckPolicy>()
override val verifierType: VerifierType by config
val useHTTPS: Boolean by config val useHTTPS: Boolean by config
val p2pAddress: HostAndPort by config val p2pAddress: HostAndPort by config
val rpcAddress: HostAndPort? by config val rpcAddress: HostAndPort? by config
@ -90,4 +100,15 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
} }
} }
private fun parseCertificateChainCheckPolicy(config: Config): CertificateChainCheckPolicy {
val policy = config.getString("policy")
return when (policy) {
"Any" -> CertificateChainCheckPolicy.Any
"RootMustMatch" -> CertificateChainCheckPolicy.RootMustMatch
"LeafMustMatch" -> CertificateChainCheckPolicy.LeafMustMatch
"MustContainOneOf" -> CertificateChainCheckPolicy.MustContainOneOf(config.getStringList("trustedAliases").toSet())
else -> throw IllegalArgumentException("Invalid certificate chain check policy $policy")
}
}
private fun Config.getOptionalConfig(path: String): Config? = if (hasPath(path)) getConfig(path) else null private fun Config.getOptionalConfig(path: String): Config? = if (hasPath(path)) getConfig(path) else null

View File

@ -22,13 +22,11 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE
import net.corda.nodeapi.*
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX import net.corda.nodeapi.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.expectedOnDefaultFileSystem
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration import org.apache.activemq.artemis.core.config.Configuration
@ -51,8 +49,8 @@ import org.bouncycastle.asn1.x500.X500Name
import rx.Subscription import rx.Subscription
import java.io.IOException import java.io.IOException
import java.math.BigInteger import java.math.BigInteger
import java.security.KeyStore
import java.security.Principal import java.security.Principal
import java.security.PublicKey
import java.util.* import java.util.*
import java.util.concurrent.Executor import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
@ -67,6 +65,7 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
import javax.security.auth.login.FailedLoginException import javax.security.auth.login.FailedLoginException
import javax.security.auth.login.LoginException import javax.security.auth.login.LoginException
import javax.security.auth.spi.LoginModule import javax.security.auth.spi.LoginModule
import javax.security.cert.CertificateException
import javax.security.cert.X509Certificate import javax.security.cert.X509Certificate
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman. // TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
@ -201,6 +200,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* 1. The node itself. It is given full access to all valid queues. * 1. The node itself. It is given full access to all valid queues.
* 2. Peers on the same network as us. These are only given permission to send to our P2P inbound queue. * 2. Peers on the same network as us. These are only given permission to send to our P2P inbound queue.
* 3. RPC users. These are only given sufficient access to perform RPC with us. * 3. RPC users. These are only given sufficient access to perform RPC with us.
* 4. Verifiers. These are given read access to the verification request queue and write access to the response queue.
*/ */
private fun ConfigurationImpl.configureAddressSecurity() { private fun ConfigurationImpl.configureAddressSecurity() {
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true) val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
@ -214,6 +214,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
nodeInternalRole, nodeInternalRole,
restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)) restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true))
} }
securityRoles[VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, consume = true))
securityRoles["${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.*"] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, send = true))
} }
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
@ -224,9 +226,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
} }
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
val rootCAPublicKey = X509Utilities
.loadCertificateFromKeyStore(config.trustStoreFile, config.trustStorePassword, CORDA_ROOT_CA)
.publicKey
val ourCertificate = X509Utilities val ourCertificate = X509Utilities
.loadCertificateFromKeyStore(config.keyStoreFile, config.keyStorePassword, CORDA_CLIENT_CA) .loadCertificateFromKeyStore(config.keyStoreFile, config.keyStorePassword, CORDA_CLIENT_CA)
val ourSubjectDN = X500Name(ourCertificate.subjectDN.name) val ourSubjectDN = X500Name(ourCertificate.subjectDN.name)
@ -234,13 +233,22 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
require(ourSubjectDN.commonName == config.myLegalName) { require(ourSubjectDN.commonName == config.myLegalName) {
"Legal name does not match with our subject CN: $ourSubjectDN" "Legal name does not match with our subject CN: $ourSubjectDN"
} }
val defaultCertPolicies = mapOf(
PEER_ROLE to CertificateChainCheckPolicy.RootMustMatch,
NODE_ROLE to CertificateChainCheckPolicy.LeafMustMatch,
VERIFIER_ROLE to CertificateChainCheckPolicy.RootMustMatch
)
val keyStore = X509Utilities.loadKeyStore(config.keyStoreFile, config.keyStorePassword)
val trustStore = X509Utilities.loadKeyStore(config.trustStoreFile, config.trustStorePassword)
val certChecks = defaultCertPolicies.mapValues {
(config.certificateChainCheckPolicies[it.key] ?: it.value).createCheck(keyStore, trustStore)
}
val securityConfig = object : SecurityConfiguration() { val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module // Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> { override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf( val options = mapOf(
RPCUserService::class.java.name to userService, RPCUserService::class.java.name to userService,
CORDA_ROOT_CA to rootCAPublicKey, NodeLoginModule.CERT_CHAIN_CHECKS_OPTION_NAME to certChecks)
CORDA_CLIENT_CA to ourCertificate.publicKey)
return arrayOf(AppConfigurationEntry(name, REQUIRED, options)) return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
} }
} }
@ -448,6 +456,66 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>?,
} }
} }
sealed class CertificateChainCheckPolicy {
@FunctionalInterface
interface Check {
fun checkCertificateChain(theirChain: Array<X509Certificate>)
}
abstract fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check
object Any : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
return object : Check {
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
}
}
}
}
object RootMustMatch : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
val rootPublicKey = trustStore.getCertificate(CORDA_ROOT_CA).publicKey
return object : Check {
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
val theirRoot = theirChain.last().publicKey
if (rootPublicKey != theirRoot) {
throw CertificateException("Root certificate mismatch, their root = $theirRoot")
}
}
}
}
}
object LeafMustMatch : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
val ourPublicKey = keyStore.getCertificate(CORDA_CLIENT_CA).publicKey
return object : Check {
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
val theirLeaf = theirChain.first().publicKey
if (ourPublicKey != theirLeaf) {
throw CertificateException("Leaf certificate mismatch, their leaf = $theirLeaf")
}
}
}
}
}
class MustContainOneOf(val trustedAliases: Set<String>) : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
val trustedPublicKeys = trustedAliases.map { trustStore.getCertificate(it).publicKey }.toSet()
return object : Check {
override fun checkCertificateChain(theirChain: Array<X509Certificate>) {
if (!theirChain.any { it.publicKey in trustedPublicKeys }) {
throw CertificateException("Their certificate chain contained none of the trusted ones")
}
}
}
}
}
}
/** /**
* Clients must connect to us with a username and password and must use TLS. If a someone connects with * Clients must connect to us with a username and password and must use TLS. If a someone connects with
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate * [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate
@ -465,6 +533,9 @@ class NodeLoginModule : LoginModule {
const val PEER_ROLE = "SystemRoles/Peer" const val PEER_ROLE = "SystemRoles/Peer"
const val NODE_ROLE = "SystemRoles/Node" const val NODE_ROLE = "SystemRoles/Node"
const val RPC_ROLE = "SystemRoles/RPC" const val RPC_ROLE = "SystemRoles/RPC"
const val VERIFIER_ROLE = "SystemRoles/Verifier"
const val CERT_CHAIN_CHECKS_OPTION_NAME = "CertChainChecks"
val log = loggerFor<NodeLoginModule>() val log = loggerFor<NodeLoginModule>()
} }
@ -473,23 +544,26 @@ class NodeLoginModule : LoginModule {
private lateinit var subject: Subject private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler private lateinit var callbackHandler: CallbackHandler
private lateinit var userService: RPCUserService private lateinit var userService: RPCUserService
private lateinit var ourRootCAPublicKey: PublicKey private lateinit var peerCertCheck: CertificateChainCheckPolicy.Check
private lateinit var ourPublicKey: PublicKey private lateinit var nodeCertCheck: CertificateChainCheckPolicy.Check
private lateinit var verifierCertCheck: CertificateChainCheckPolicy.Check
private val principals = ArrayList<Principal>() private val principals = ArrayList<Principal>()
@Suppress("UNCHECKED_CAST")
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) { override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject this.subject = subject
this.callbackHandler = callbackHandler this.callbackHandler = callbackHandler
userService = options[RPCUserService::class.java.name] as RPCUserService userService = options[RPCUserService::class.java.name] as RPCUserService
ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey val certChainChecks = options[CERT_CHAIN_CHECKS_OPTION_NAME] as Map<String, CertificateChainCheckPolicy.Check>
ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey peerCertCheck = certChainChecks[PEER_ROLE]!!
nodeCertCheck = certChainChecks[NODE_ROLE]!!
verifierCertCheck = certChainChecks[VERIFIER_ROLE]!!
} }
override fun login(): Boolean { override fun login(): Boolean {
val nameCallback = NameCallback("Username: ") val nameCallback = NameCallback("Username: ")
val passwordCallback = PasswordCallback("Password: ", false) val passwordCallback = PasswordCallback("Password: ", false)
val certificateCallback = CertificateCallback() val certificateCallback = CertificateCallback()
try { try {
callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback)) callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback))
} catch (e: IOException) { } catch (e: IOException) {
@ -504,9 +578,11 @@ class NodeLoginModule : LoginModule {
log.info("Processing login for $username") log.info("Processing login for $username")
try {
val validatedUser = when (determineUserRole(certificates, username)) { val validatedUser = when (determineUserRole(certificates, username)) {
PEER_ROLE -> authenticatePeer(certificates) PEER_ROLE -> authenticatePeer(certificates)
NODE_ROLE -> authenticateNode(certificates) NODE_ROLE -> authenticateNode(certificates)
VERIFIER_ROLE -> authenticateVerifier(certificates)
RPC_ROLE -> authenticateRpcUser(password, username) RPC_ROLE -> authenticateRpcUser(password, username)
else -> throw FailedLoginException("Peer does not belong on our network") else -> throw FailedLoginException("Peer does not belong on our network")
} }
@ -514,22 +590,26 @@ class NodeLoginModule : LoginModule {
loginSucceeded = true loginSucceeded = true
return loginSucceeded return loginSucceeded
} catch (exception: FailedLoginException) {
log.warn("$exception")
throw exception
}
} }
private fun authenticateNode(certificates: Array<X509Certificate>): String { private fun authenticateNode(certificates: Array<X509Certificate>): String {
val peerCertificate = certificates.first() nodeCertCheck.checkCertificateChain(certificates)
if (peerCertificate.publicKey != ourPublicKey) {
throw FailedLoginException("Only the node can login as $NODE_USER")
}
principals += RolePrincipal(NODE_ROLE) principals += RolePrincipal(NODE_ROLE)
return peerCertificate.subjectDN.name return certificates.first().subjectDN.name
}
private fun authenticateVerifier(certificates: Array<X509Certificate>): String {
verifierCertCheck.checkCertificateChain(certificates)
principals += RolePrincipal(VERIFIER_ROLE)
return certificates.first().subjectDN.name
} }
private fun authenticatePeer(certificates: Array<X509Certificate>): String { private fun authenticatePeer(certificates: Array<X509Certificate>): String {
val theirRootCAPublicKey = certificates.last().publicKey peerCertCheck.checkCertificateChain(certificates)
if (theirRootCAPublicKey != ourRootCAPublicKey) {
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
}
principals += RolePrincipal(PEER_ROLE) principals += RolePrincipal(PEER_ROLE)
return certificates.first().subjectDN.name return certificates.first().subjectDN.name
} }
@ -547,16 +627,30 @@ class NodeLoginModule : LoginModule {
} }
private fun determineUserRole(certificates: Array<X509Certificate>?, username: String): String? { private fun determineUserRole(certificates: Array<X509Certificate>?, username: String): String? {
return if (username == PEER_USER || username == NODE_USER) { fun requireTls() = require(certificates != null) { "No TLS?" }
certificates ?: throw FailedLoginException("No TLS?") return when (username) {
if (username == PEER_USER) PEER_ROLE else NODE_ROLE PEER_USER -> {
} else if (certificates == null) { requireTls()
PEER_ROLE
}
NODE_USER -> {
requireTls()
NODE_ROLE
}
VerifierApi.VERIFIER_USERNAME -> {
requireTls()
VERIFIER_ROLE
}
else -> {
// Assume they're an RPC user if its from a non-ssl connection // Assume they're an RPC user if its from a non-ssl connection
if (certificates == null) {
RPC_ROLE RPC_ROLE
} else { } else {
null null
} }
} }
}
}
override fun commit(): Boolean { override fun commit(): Boolean {
val result = loginSucceeded val result = loginSucceeded

View File

@ -7,19 +7,29 @@ import net.corda.core.crypto.CompositeKey
import net.corda.core.messaging.* import net.corda.core.messaging.*
import net.corda.core.node.NodeVersionInfo import net.corda.core.node.NodeVersionInfo
import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.random63BitValue
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.opaque import net.corda.core.serialization.opaque
import net.corda.core.success import net.corda.core.success
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserService
import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
import net.corda.node.utilities.* import net.corda.node.utilities.*
import net.corda.nodeapi.ArtemisMessagingComponent import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.* import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
@ -33,6 +43,7 @@ import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox // TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
@ -62,7 +73,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val myIdentity: CompositeKey?, val myIdentity: CompositeKey?,
val nodeExecutor: AffinityExecutor, val nodeExecutor: AffinityExecutor,
val database: Database, val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>) : ArtemisMessagingComponent(), MessagingServiceInternal { val networkMapRegistrationFuture: ListenableFuture<Unit>,
val monitoringService: MonitoringService
) : ArtemisMessagingComponent(), MessagingServiceInternal {
companion object { companion object {
private val log = loggerFor<NodeMessagingClient>() private val log = loggerFor<NodeMessagingClient>()
@ -75,6 +88,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private val nodeVersionProperty = SimpleString("node-version") private val nodeVersionProperty = SimpleString("node-version")
private val nodeVendorProperty = SimpleString("node-vendor") private val nodeVendorProperty = SimpleString("node-vendor")
private val amqDelay: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0")) private val amqDelay: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
} }
private class InnerState { private class InnerState {
@ -88,6 +102,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Consumer for inbound client RPC messages. // Consumer for inbound client RPC messages.
var rpcConsumer: ClientConsumer? = null var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null
var verificationResponseConsumer: ClientConsumer? = null
}
val verifierService = when (config.verifierType) {
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
VerifierType.OutOfProcess -> createOutOfProcessVerifierService()
} }
/** A registration to handle messages of different types */ /** A registration to handle messages of different types */
@ -163,6 +182,19 @@ class NodeMessagingClient(override val config: NodeConfiguration,
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE) rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE) rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE)
rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName) rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName)
fun checkVerifierCount() {
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!")
}
}
if (config.verifierType == VerifierType.OutOfProcess) {
createQueueIfAbsent(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)
createQueueIfAbsent(verifierResponseAddress)
verificationResponseConsumer = session.createConsumer(verifierResponseAddress)
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
}
} }
} }
@ -224,6 +256,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
check(!running) { "run can't be called twice" } check(!running) { "run can't be called twice" }
running = true running = true
rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, nodeExecutor) rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, nodeExecutor)
(verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!)
p2pConsumer!! p2pConsumer!!
} }
@ -463,6 +496,23 @@ class NodeMessagingClient(override val config: NodeConfiguration,
} }
} }
private fun createOutOfProcessVerifierService(): TransactionVerifierService {
return object : OutOfProcessTransactionVerifierService(monitoringService) {
override fun sendRequest(nonce: Long, transaction: LedgerTransaction) {
messagingExecutor.fetchFrom {
state.locked {
val message = session!!.createMessage(false)
val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress))
request.writeToClientMessage(message)
producer!!.send(VERIFICATION_REQUESTS_QUEUE_NAME, message)
}
}
}
}
}
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients { override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) { return when (partyInfo) {
is PartyInfo.Node -> partyInfo.node.address is PartyInfo.Node -> partyInfo.node.address

View File

@ -0,0 +1,18 @@
package net.corda.node.services.transactions
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
import java.util.concurrent.Executors
class InMemoryTransactionVerifierService(numberOfWorkers: Int) : SingletonSerializeAsToken(), TransactionVerifierService {
private val workerPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfWorkers))
override fun verify(transaction: LedgerTransaction): ListenableFuture<*> {
return workerPool.submit {
transaction.verify()
}
}
}

View File

@ -0,0 +1,70 @@
package net.corda.node.services.transactions
import com.codahale.metrics.Gauge
import com.codahale.metrics.Timer
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.random63BitValue
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.MonitoringService
import net.corda.nodeapi.VerifierApi
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import java.util.concurrent.ConcurrentHashMap
abstract class OutOfProcessTransactionVerifierService(
val monitoringService: MonitoringService
) : SingletonSerializeAsToken(), TransactionVerifierService {
companion object {
val log = loggerFor<OutOfProcessTransactionVerifierService>()
}
private data class VerificationHandle(
val transactionId: SecureHash,
val resultFuture: SettableFuture<Unit>,
val durationTimerContext: Timer.Context
)
private val verificationHandles = ConcurrentHashMap<Long, VerificationHandle>()
// Metrics
private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name"
private val durationTimer = monitoringService.metrics.timer(metric("Verification.Duration"))
private val successMeter = monitoringService.metrics.meter(metric("Verification.Success"))
private val failureMeter = monitoringService.metrics.meter(metric("Verification.Failure"))
class VerificationResultForUnknownTransaction(nonce: Long) :
Exception("Verification result arrived for unknown transaction nonce $nonce")
fun start(responseConsumer: ClientConsumer) {
log.info("Starting out of process verification service")
monitoringService.metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size })
responseConsumer.setMessageHandler { message ->
val response = VerifierApi.VerificationResponse.fromClientMessage(message)
val handle = verificationHandles.remove(response.verificationId) ?:
throw VerificationResultForUnknownTransaction(response.verificationId)
handle.durationTimerContext.stop()
val exception = response.exception
if (exception == null) {
successMeter.mark()
handle.resultFuture.set(Unit)
} else {
failureMeter.mark()
handle.resultFuture.setException(exception)
}
}
}
abstract fun sendRequest(nonce: Long, transaction: LedgerTransaction)
override fun verify(transaction: LedgerTransaction): ListenableFuture<*> {
log.info("Verifying ${transaction.id}")
val future = SettableFuture.create<Unit>()
val nonce = random63BitValue()
verificationHandles[nonce] = VerificationHandle(transaction.id, future, durationTimer.time())
sendRequest(nonce, transaction)
return future
}
}

View File

@ -1,5 +1,6 @@
package net.corda.node.utilities package net.corda.node.utilities
import com.google.common.util.concurrent.ListeningScheduledExecutorService
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.Uninterruptibles import com.google.common.util.concurrent.Uninterruptibles
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
@ -51,13 +52,12 @@ interface AffinityExecutor : Executor {
* tasks in the future and verify code is running on the executor. * tasks in the future and verify code is running on the executor.
*/ */
open class ServiceAffinityExecutor(threadName: String, numThreads: Int) : AffinityExecutor, open class ServiceAffinityExecutor(threadName: String, numThreads: Int) : AffinityExecutor,
ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue<Runnable>()) { ScheduledThreadPoolExecutor(numThreads) {
companion object { companion object {
val logger = loggerFor<ServiceAffinityExecutor>() val logger = loggerFor<ServiceAffinityExecutor>()
} }
private val threads = Collections.synchronizedSet(HashSet<Thread>()) private val threads = Collections.synchronizedSet(HashSet<Thread>())
private val uncaughtExceptionHandler = Thread.currentThread().uncaughtExceptionHandler
init { init {
setThreadFactory(fun(runnable: Runnable): Thread { setThreadFactory(fun(runnable: Runnable): Thread {
@ -77,11 +77,6 @@ interface AffinityExecutor : Executor {
}) })
} }
override fun afterExecute(r: Runnable, t: Throwable?) {
if (t != null)
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
}
override val isOnThread: Boolean get() = Thread.currentThread() in threads override val isOnThread: Boolean get() = Thread.currentThread() in threads
override fun flush() { override fun flush() {

View File

@ -15,3 +15,4 @@ certificateSigningService = "https://cordaci-netperm.corda.r3cev.com"
useHTTPS = false useHTTPS = false
h2port = 0 h2port = 0
useTestClock = false useTestClock = false
verifierType = InMemory

View File

@ -16,6 +16,7 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.persistence.DataVending import net.corda.node.services.persistence.DataVending
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.testing.MOCK_IDENTITY_SERVICE import net.corda.testing.MOCK_IDENTITY_SERVICE
import net.corda.testing.node.MockNetworkMapCache import net.corda.testing.node.MockNetworkMapCache
import net.corda.testing.node.MockStorageService import net.corda.testing.node.MockStorageService
@ -32,8 +33,11 @@ open class MockServiceHubInternal(
val scheduler: SchedulerService? = null, val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(), val overrideClock: Clock? = NodeClock(),
val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(), val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(),
val schemas: SchemaService? = NodeSchemaService() val schemas: SchemaService? = NodeSchemaService(),
val customTransactionVerifierService: TransactionVerifierService? = InMemoryTransactionVerifierService(2)
) : ServiceHubInternal() { ) : ServiceHubInternal() {
override val transactionVerifierService: TransactionVerifierService
get() = customTransactionVerifierService ?: throw UnsupportedOperationException()
override val vaultService: VaultService override val vaultService: VaultService
get() = customVault ?: throw UnsupportedOperationException() get() = customVault ?: throw UnsupportedOperationException()
override val keyManagementService: KeyManagementService override val keyManagementService: KeyManagementService

View File

@ -1,5 +1,6 @@
package net.corda.node.services.messaging package net.corda.node.services.messaging
import com.codahale.metrics.MetricRegistry
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
@ -14,6 +15,7 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.utilities.LogHelper import net.corda.core.utilities.LogHelper
import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.configureWithDevSSLCertificate
@ -226,7 +228,8 @@ class ArtemisMessagingTests {
identity.public.composite, identity.public.composite,
ServiceAffinityExecutor("ArtemisMessagingTests", 1), ServiceAffinityExecutor("ArtemisMessagingTests", 1),
database, database,
networkMapRegistrationFuture).apply { networkMapRegistrationFuture,
MonitoringService(MetricRegistry())).apply {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
messagingClient = this messagingClient = this
} }

View File

@ -82,18 +82,4 @@ class AffinityExecutorTests {
latch.countDown() latch.countDown()
executor.flush() executor.flush()
} }
@Test fun `exceptions are reported to the specified handler`() {
val exception = AtomicReference<Throwable?>()
// Run in a separate thread to avoid messing with any default exception handlers in the unit test thread.
thread {
Thread.currentThread().setUncaughtExceptionHandler { thread, throwable -> exception.set(throwable) }
_executor = AffinityExecutor.ServiceAffinityExecutor("test3", 1)
executor.execute {
throw Exception("foo")
}
executor.flush()
}.join()
assertEquals("foo", exception.get()?.message)
}
} }

View File

@ -3,7 +3,6 @@ package net.corda.attachmentdemo
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import joptsimple.OptionParser import joptsimple.OptionParser
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.core.contracts.TransactionType import net.corda.core.contracts.TransactionType
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
@ -13,6 +12,7 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.Emoji import net.corda.core.utilities.Emoji
import net.corda.flows.FinalityFlow import net.corda.flows.FinalityFlow
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.testing.ALICE_KEY import net.corda.testing.ALICE_KEY
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths

View File

@ -4,7 +4,6 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import joptsimple.OptionParser import joptsimple.OptionParser
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.core.crypto.toStringShort import net.corda.core.crypto.toStringShort
import net.corda.core.div import net.corda.core.div
import net.corda.core.getOrThrow import net.corda.core.getOrThrow
@ -12,6 +11,7 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.flows.NotaryFlow import net.corda.flows.NotaryFlow
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.notarydemo.flows.DummyIssueAndMove import net.corda.notarydemo.flows.DummyIssueAndMove
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths

View File

@ -16,6 +16,7 @@ include 'client:mock'
include 'client:rpc' include 'client:rpc'
include 'experimental' include 'experimental'
include 'experimental:sandbox' include 'experimental:sandbox'
include 'verifier'
include 'test-utils' include 'test-utils'
include 'tools:explorer' include 'tools:explorer'
include 'tools:explorer:capsule' include 'tools:explorer:capsule'

View File

@ -27,6 +27,7 @@ dependencies {
compile project(':core') compile project(':core')
compile project(':node') compile project(':node')
compile project(':node:webserver') compile project(':node:webserver')
compile project(':verifier')
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"

View File

@ -22,6 +22,8 @@ import net.corda.node.internal.AbstractNode
import net.corda.node.internal.NetworkMapInfo import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureDevKeyAndTrustStores import net.corda.node.services.config.configureDevKeyAndTrustStores
import net.corda.node.services.config.VerifierType
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.testing.node.MockIdentityService import net.corda.testing.node.MockIdentityService
@ -166,7 +168,9 @@ data class TestNodeConfiguration(
override val emailAddress: String = "", override val emailAddress: String = "",
override val exportJMXto: String = "", override val exportJMXto: String = "",
override val devMode: Boolean = true, override val devMode: Boolean = true,
override val certificateSigningService: URL = URL("http://localhost")) : NodeConfiguration override val certificateSigningService: URL = URL("http://localhost"),
override val certificateChainCheckPolicies: Map<String, CertificateChainCheckPolicy> = emptyMap(),
override val verifierType: VerifierType = VerifierType.InMemory) : NodeConfiguration
fun Config.getHostAndPort(name: String) = HostAndPort.fromString(getString(name)) fun Config.getHostAndPort(name: String) = HostAndPort.fromString(getString(name))

View File

@ -1,34 +0,0 @@
package net.corda.testing
import java.nio.file.Paths
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
fun spawn(className: String, args: List<String>, appName: String): Process {
val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path")
val path = System.getProperty("java.home") + separator + "bin" + separator + "java"
val javaArgs = listOf(path, "-Dname=$appName", "-javaagent:lib/quasar.jar", "-cp", classpath, className)
val builder = ProcessBuilder(javaArgs + args)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()
val process = builder.start()
return process
}
fun assertExitOrKill(proc: Process) {
try {
assertEquals(proc.waitFor(2, TimeUnit.MINUTES), true)
} catch (e: Throwable) {
proc.destroyForcibly()
throw e
}
}
fun assertAliveAndKill(proc: Process) {
try {
assertEquals(proc.isAlive, true)
} finally {
proc.destroyForcibly()
}
}

View File

@ -1,10 +1,10 @@
package net.corda.testing.messaging package net.corda.testing.messaging
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.ArtemisMessagingComponent import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.testing.configureTestSSL import net.corda.testing.configureTestSSL
import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.client.*

View File

@ -22,6 +22,7 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.keys.E2ETestKeyManagementService import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.network.InMemoryNetworkMapService import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.InMemoryUniquenessProvider import net.corda.node.services.transactions.InMemoryUniquenessProvider
import net.corda.node.services.transactions.SimpleNotaryService import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.services.transactions.ValidatingNotaryService
@ -198,6 +199,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider = InMemoryUniquenessProvider() override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider = InMemoryUniquenessProvider()
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
override fun start(): MockNode { override fun start(): MockNode {
super.start() super.start()
mockNet.identities.add(info.legalIdentity) mockNet.identities.add(info.legalIdentity)

View File

@ -18,6 +18,7 @@ import net.corda.node.services.persistence.InMemoryStateMachineRecordedTransacti
import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.testing.MEGA_CORP import net.corda.testing.MEGA_CORP
import net.corda.testing.MINI_CORP import net.corda.testing.MINI_CORP
import net.corda.testing.MOCK_VERSION import net.corda.testing.MOCK_VERSION
@ -68,6 +69,7 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub {
override val clock: Clock get() = Clock.systemUTC() override val clock: Clock get() = Clock.systemUTC()
override val schedulerService: SchedulerService get() = throw UnsupportedOperationException() override val schedulerService: SchedulerService get() = throw UnsupportedOperationException()
override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite), MOCK_VERSION) override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite), MOCK_VERSION)
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
fun makeVaultService(dataSourceProps: Properties): VaultService { fun makeVaultService(dataSourceProps: Properties): VaultService {
val vaultService = NodeVaultService(this, dataSourceProps) val vaultService = NodeVaultService(this, dataSourceProps)

View File

@ -1,5 +1,6 @@
package net.corda.testing.node package net.corda.testing.node
import com.codahale.metrics.MetricRegistry
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.composite import net.corda.core.crypto.composite
@ -19,6 +20,7 @@ import org.jetbrains.exposed.sql.Database
import java.io.Closeable import java.io.Closeable
import java.security.KeyPair import java.security.KeyPair
import kotlin.concurrent.thread import kotlin.concurrent.thread
import net.corda.node.services.api.MonitoringService
/** /**
* This is a bare-bones node which can only send and receive messages. It doesn't register with a network map service or * This is a bare-bones node which can only send and receive messages. It doesn't register with a network map service or
@ -29,6 +31,7 @@ class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeL
private val databaseWithCloseable: Pair<Closeable, Database> = configureDatabase(config.dataSourceProperties) private val databaseWithCloseable: Pair<Closeable, Database> = configureDatabase(config.dataSourceProperties)
val database: Database get() = databaseWithCloseable.second val database: Database get() = databaseWithCloseable.second
val userService = RPCUserServiceImpl(config) val userService = RPCUserServiceImpl(config)
val monitoringService = MonitoringService(MetricRegistry())
val identity: KeyPair = generateKeyPair() val identity: KeyPair = generateKeyPair()
val executor = ServiceAffinityExecutor(config.myLegalName, 1) val executor = ServiceAffinityExecutor(config.myLegalName, 1)
val broker = ArtemisMessagingServer(config, address, rpcAddress, InMemoryNetworkMapCache(), userService) val broker = ArtemisMessagingServer(config, address, rpcAddress, InMemoryNetworkMapCache(), userService)
@ -41,7 +44,8 @@ class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeL
identity.public.composite, identity.public.composite,
executor, executor,
database, database,
networkMapRegistrationFuture) networkMapRegistrationFuture,
monitoringService)
} }
fun start() { fun start() {

View File

@ -13,7 +13,6 @@ import net.corda.node.driver.PortAllocation
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.Closeable import java.io.Closeable
import java.nio.file.Path
import java.util.* import java.util.*
private val log = LoggerFactory.getLogger(ConnectionManager::class.java) private val log = LoggerFactory.getLogger(ConnectionManager::class.java)
@ -93,16 +92,15 @@ class ConnectionManager(private val username: String, private val jSch: JSch) {
* safely cleaned up if an exception is thrown. * safely cleaned up if an exception is thrown.
* *
* @param username The UNIX username to use for SSH authentication. * @param username The UNIX username to use for SSH authentication.
* @param nodeHostsAndCertificatesPaths The list of hosts and associated remote paths to the nodes' certificate directories. * @param nodeHosts The list of hosts.
* @param remoteMessagingPort The Artemis messaging port nodes are listening on. * @param remoteMessagingPort The Artemis messaging port nodes are listening on.
* @param tunnelPortAllocation A local port allocation strategy for creating SSH tunnels. * @param tunnelPortAllocation A local port allocation strategy for creating SSH tunnels.
* @param certificatesBaseDirectory A local directory to put downloaded certificates in.
* @param withConnections An action to run once we're connected to the nodes. * @param withConnections An action to run once we're connected to the nodes.
* @return The return value of [withConnections] * @return The return value of [withConnections]
*/ */
fun <A> connectToNodes( fun <A> connectToNodes(
username: String, username: String,
nodeHostsAndCertificatesPaths: List<Pair<String, Path>>, nodeHosts: List<String>,
remoteMessagingPort: Int, remoteMessagingPort: Int,
tunnelPortAllocation: PortAllocation, tunnelPortAllocation: PortAllocation,
rpcUsername: String, rpcUsername: String,
@ -110,9 +108,9 @@ fun <A> connectToNodes(
withConnections: (List<NodeConnection>) -> A withConnections: (List<NodeConnection>) -> A
): A { ): A {
val manager = ConnectionManager(username, setupJSchWithSshAgent()) val manager = ConnectionManager(username, setupJSchWithSshAgent())
val connections = nodeHostsAndCertificatesPaths.parallelStream().map { nodeHostAndCertificatesPath -> val connections = nodeHosts.parallelStream().map { nodeHost ->
manager.connectToNode( manager.connectToNode(
nodeHost = nodeHostAndCertificatesPath.first, nodeHost = nodeHost,
remoteMessagingPort = remoteMessagingPort, remoteMessagingPort = remoteMessagingPort,
localTunnelAddress = tunnelPortAllocation.nextHostAndPort(), localTunnelAddress = tunnelPortAllocation.nextHostAndPort(),
rpcUsername = rpcUsername, rpcUsername = rpcUsername,

View File

@ -160,7 +160,7 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
val random = SplittableRandom(seed) val random = SplittableRandom(seed)
connectToNodes( connectToNodes(
configuration.sshUser, configuration.sshUser,
configuration.nodeHosts.map { it to configuration.remoteNodeDirectory / "certificates" }, configuration.nodeHosts,
configuration.remoteMessagingPort, configuration.remoteMessagingPort,
PortAllocation.Incremental(configuration.localTunnelStartingPort), PortAllocation.Incremental(configuration.localTunnelStartingPort),
configuration.rpcUsername, configuration.rpcUsername,

90
verifier/build.gradle Normal file
View File

@ -0,0 +1,90 @@
apply plugin: 'kotlin'
apply plugin: 'net.corda.plugins.quasar-utils'
apply plugin: 'net.corda.plugins.publish-utils'
description 'Corda core'
buildscript {
repositories {
mavenCentral()
}
}
repositories {
mavenLocal()
mavenCentral()
jcenter()
maven {
url 'http://oss.sonatype.org/content/repositories/snapshots'
}
maven {
url 'https://dl.bintray.com/kotlin/exposed'
}
}
//noinspection GroovyAssignabilityCheck
configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
}
sourceSets {
integrationTest {
kotlin {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integration-test/kotlin')
}
}
test {
resources {
srcDir "../config/test"
}
}
main {
resources {
srcDir "../config/dev"
}
}
}
dependencies {
compile project(":node-api")
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
compile "org.jetbrains.kotlinx:kotlinx-support-jdk8:0.3"
compile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
compile "org.apache.activemq:artemis-core-client:${artemis_version}"
// Log4J: logging framework (with SLF4J bindings)
compile "org.apache.logging.log4j:log4j-slf4j-impl:${log4j_version}"
compile "org.apache.logging.log4j:log4j-core:${log4j_version}"
integrationTestCompile project(":test-utils")
integrationTestCompile project(":client:mock")
// Integration test helpers
integrationTestCompile "junit:junit:$junit_version"
integrationTestCompile "org.apache.activemq:artemis-server:${artemis_version}"
}
task standaloneJar(type: Jar) {
// Create a fat jar by packing all deps into the output
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
exclude("META-INF/*.DSA")
exclude("META-INF/*.RSA")
exclude("META-INF/*.SF")
manifest {
attributes 'Main-Class': 'net.corda.verifier.Verifier'
}
archiveName "corda-verifier.jar"
}
task integrationTest(type: Test) {
testClassesDir = sourceSets.integrationTest.output.classesDir
classpath = sourceSets.integrationTest.runtimeClasspath
}

View File

@ -0,0 +1,230 @@
package net.corda.verifier
import net.corda.client.mock.*
import net.corda.core.contracts.*
import net.corda.core.crypto.*
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.WireTransaction
import java.io.ByteArrayInputStream
import java.math.BigInteger
import java.util.*
/**
* [GeneratedLedger] is a ledger with transactions that always verify.
* It provides generator methods, in particular [transactionGenerator] that generates a valid transaction and also
* returns the new state of the ledger.
*/
data class GeneratedLedger(
val transactions: List<WireTransaction>,
// notary -> outputs. We need to track this because of the unique-notary-on-inputs invariant
val availableOutputs: Map<Party, List<StateAndRef<ContractState>>>,
val attachments: Set<Attachment>,
val identities: Set<Party>
) {
val hashTransactionMap: Map<SecureHash, WireTransaction> by lazy { transactions.associateBy(WireTransaction::id) }
val attachmentMap: Map<SecureHash, Attachment> by lazy { attachments.associateBy(Attachment::id) }
val identityMap: Map<CompositeKey, Party> by lazy { identities.associateBy(Party::owningKey) }
companion object {
val empty = GeneratedLedger(emptyList(), emptyMap(), emptySet(), emptySet())
}
fun resolveWireTransaction(transaction: WireTransaction): LedgerTransaction {
return transaction.toLedgerTransaction(
resolveIdentity = { identityMap[it] },
resolveAttachment = { attachmentMap[it] },
resolveStateRef = { hashTransactionMap[it.txhash]?.outputs?.get(it.index) }
)
}
val attachmentsGenerator: Generator<List<Attachment>> by lazy {
Generator.replicatePoisson(1.0, pickOneOrMaybeNew(attachments, attachmentGenerator))
}
val commandsGenerator: Generator<List<Pair<Command, Party>>> by lazy {
Generator.replicatePoisson(4.0, commandGenerator(identities))
}
/**
* Generates an issuance(root) transaction.
* Invariants: The input list must be empty.
*/
val issuanceGenerator: Generator<Pair<WireTransaction, GeneratedLedger>> by lazy {
val outputsGen = outputsGenerator.bind { outputs ->
Generator.sequence(
outputs.map { output ->
pickOneOrMaybeNew(identities, partyGenerator).map { notary ->
TransactionState(output, notary, null)
}
}
)
}
attachmentsGenerator.combine(outputsGen, commandsGenerator) { txAttachments, outputs, commands ->
val signers = commands.flatMap { it.first.signers }
val newTransaction = WireTransaction(
emptyList(),
txAttachments.map { it.id },
outputs,
commands.map { it.first },
null,
signers,
TransactionType.General(),
null
)
val newOutputStateAndRefs = outputs.mapIndexed { i, state ->
StateAndRef(state, StateRef(newTransaction.id, i))
}
val newAvailableOutputs = availableOutputs + newOutputStateAndRefs.groupBy { it.state.notary }
val newAttachments = attachments + txAttachments
val newIdentities = identities + commands.map { it.second } + outputs.map { it.notary }
val newLedger = GeneratedLedger(transactions + newTransaction, newAvailableOutputs, newAttachments, newIdentities)
Pair(newTransaction, newLedger)
}
}
/**
* Generates a regular non-issue transaction.
* Invariants:
* * Input and output notaries must be one and the same.
*/
fun regularTransactionGenerator(inputNotary: Party, inputsToChooseFrom: List<StateAndRef<ContractState>>): Generator<Pair<WireTransaction, GeneratedLedger>> {
val outputsGen = outputsGenerator.map { outputs ->
outputs.map { output ->
TransactionState(output, inputNotary, null)
}
}
val inputsGen = Generator.sampleBernoulli(inputsToChooseFrom)
return inputsGen.combine(attachmentsGenerator, outputsGen, commandsGenerator) { inputs, txAttachments, outputs, commands ->
val signers = commands.flatMap { it.first.signers } + inputNotary.owningKey
val newTransaction = WireTransaction(
inputs.map { it.ref },
txAttachments.map { it.id },
outputs,
commands.map { it.first },
inputNotary,
signers,
TransactionType.General(),
null
)
val newOutputStateAndRefs = outputs.mapIndexed { i, state ->
StateAndRef(state, StateRef(newTransaction.id, i))
}
val availableOutputsMinusConsumed = HashMap(availableOutputs)
if (inputs.size == inputsToChooseFrom.size) {
availableOutputsMinusConsumed.remove(inputNotary)
} else {
availableOutputsMinusConsumed[inputNotary] = inputsToChooseFrom - inputs
}
val newAvailableOutputs = availableOutputsMinusConsumed + newOutputStateAndRefs.groupBy { it.state.notary }
val newAttachments = attachments + txAttachments
val newIdentities = identities + commands.map { it.second }
val newLedger = GeneratedLedger(transactions + newTransaction, newAvailableOutputs, newAttachments, newIdentities)
Pair(newTransaction, newLedger)
}
}
/**
* Generates a notary change transaction.
* Invariants:
* * Input notary must be different from the output ones.
* * All other data must stay the same.
*/
fun notaryChangeTransactionGenerator(inputNotary: Party, inputsToChooseFrom: List<StateAndRef<ContractState>>): Generator<Pair<WireTransaction, GeneratedLedger>> {
val newNotaryGen = pickOneOrMaybeNew(identities - inputNotary, partyGenerator)
val inputsGen = Generator.sampleBernoulli(inputsToChooseFrom)
return inputsGen.bind { inputs ->
val signers = inputs.flatMap { it.state.data.participants } + inputNotary.owningKey
val outputsGen = Generator.sequence(inputs.map { input -> newNotaryGen.map { TransactionState(input.state.data, it, null) } })
outputsGen.combine(attachmentsGenerator) { outputs, txAttachments ->
val newNotaries = outputs.map { it.notary }
val newTransaction = WireTransaction(
inputs.map { it.ref },
txAttachments.map { it.id },
outputs,
emptyList(),
inputNotary,
signers,
TransactionType.NotaryChange(),
null
)
val newOutputStateAndRefs = outputs.mapIndexed { i, state ->
StateAndRef(state, StateRef(newTransaction.id, i))
}
val availableOutputsMinusConsumed = HashMap(availableOutputs)
availableOutputsMinusConsumed[inputNotary] = inputsToChooseFrom - inputs
val newAvailableOutputs = availableOutputsMinusConsumed + newOutputStateAndRefs.groupBy { it.state.notary }
val newAttachments = attachments + txAttachments
val newIdentities = identities + newNotaries
val newLedger = GeneratedLedger(transactions + newTransaction, newAvailableOutputs, newAttachments, newIdentities)
Pair(newTransaction, newLedger)
}
}
}
/**
* Generates a valid transaction. It may be one of three types of issuance, regular and notary change. These have
* different invariants on notary fields.
*/
val transactionGenerator: Generator<Pair<WireTransaction, GeneratedLedger>> by lazy {
if (availableOutputs.isEmpty()) {
issuanceGenerator
} else {
Generator.pickOne(availableOutputs.keys.toList()).bind { inputNotary ->
val inputsToChooseFrom = availableOutputs[inputNotary]!!
Generator.frequency(
0.3 to issuanceGenerator,
0.4 to regularTransactionGenerator(inputNotary, inputsToChooseFrom),
0.3 to notaryChangeTransactionGenerator(inputNotary, inputsToChooseFrom)
)
}
}
}
}
data class GeneratedState(
val nonce: Long,
override val participants: List<CompositeKey>
) : ContractState {
override val contract = DummyContract()
}
class GeneratedAttachment(
val bytes: ByteArray
) : Attachment {
override val id = bytes.sha256()
override fun open() = ByteArrayInputStream(bytes)
}
class GeneratedCommandData(
val nonce: Long
) : CommandData
val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) }
val publicKeyGenerator = keyPairGenerator.map { it.public.composite }
val stateGenerator: Generator<ContractState> =
Generator.replicatePoisson(2.0, publicKeyGenerator).combine(Generator.long()) { participants, nonce ->
GeneratedState(nonce, participants)
}
fun commandGenerator(partiesToPickFrom: Collection<Party>): Generator<Pair<Command, Party>> {
return pickOneOrMaybeNew(partiesToPickFrom, partyGenerator).combine(Generator.long()) { signer, nonce ->
Pair(
Command(GeneratedCommandData(nonce), signer.owningKey),
signer
)
}
}
val partyGenerator: Generator<Party> = Generator.int().combine(publicKeyGenerator) { n, key -> Party("Party$n", key) }
fun <A> pickOneOrMaybeNew(from: Collection<A>, generator: Generator<A>): Generator<A> {
if (from.isEmpty()) {
return generator
} else {
return generator.bind {
Generator.pickOne(from + it)
}
}
}
val attachmentGenerator: Generator<Attachment> = Generator.bytes(16).map(::GeneratedAttachment)
val outputsGenerator = Generator.replicatePoisson(3.0, stateGenerator)

View File

@ -0,0 +1,281 @@
package net.corda.verifier
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.ListeningScheduledExecutorService
import com.google.common.util.concurrent.SettableFuture
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import net.corda.core.div
import net.corda.core.map
import net.corda.core.random63BitValue
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.ProcessUtilities
import net.corda.core.utilities.loggerFor
import net.corda.node.driver.*
import net.corda.node.services.config.configureDevKeyAndTrustStores
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.config.SSLConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.security.CheckType
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
/**
* This file defines an extension to [DriverDSL] that allows starting of verifier processes and
* lightweight verification requestors.
*/
interface VerifierExposedDSLInterface : DriverDSLExposedInterface {
/** Starts a lightweight verification requestor that implements the Node's Verifier API */
fun startVerificationRequestor(name: String): ListenableFuture<VerificationRequestorHandle>
/** Starts an out of process verifier connected to [address] */
fun startVerifier(address: HostAndPort): ListenableFuture<VerifierHandle>
/**
* Waits until [number] verifiers are listening for verification requests coming from the Node. Check
* [VerificationRequestorHandle.waitUntilNumberOfVerifiers] for an equivalent for requestors.
*/
fun NodeHandle.waitUntilNumberOfVerifiers(number: Int)
}
/** Starts a verifier connecting to the specified node */
fun VerifierExposedDSLInterface.startVerifier(nodeHandle: NodeHandle) =
startVerifier(nodeHandle.configuration.p2pAddress)
/** Starts a verifier connecting to the specified requestor */
fun VerifierExposedDSLInterface.startVerifier(verificationRequestorHandle: VerificationRequestorHandle) =
startVerifier(verificationRequestorHandle.p2pAddress)
interface VerifierInternalDSLInterface : DriverDSLInternalInterface, VerifierExposedDSLInterface
/**
* Behaves the same as [driver] and adds verifier-related functionality.
*/
fun <A> verifierDriver(
isDebug: Boolean = false,
driverDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()),
portAllocation: PortAllocation = PortAllocation.Incremental(10000),
sshdPortAllocation: PortAllocation = PortAllocation.Incremental(20000),
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
automaticallyStartNetworkMap: Boolean = true,
dsl: VerifierExposedDSLInterface.() -> A
) = genericDriver(
driverDsl = VerifierDriverDSL(
DriverDSL(
portAllocation = portAllocation,
sshdPortAllocation = sshdPortAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
automaticallyStartNetworkMap = automaticallyStartNetworkMap,
isDebug = isDebug
)
),
coerce = { it },
dsl = dsl
)
/** A handle for a verifier */
data class VerifierHandle(
val process: Process
)
/** A handle for the verification requestor */
data class VerificationRequestorHandle(
val p2pAddress: HostAndPort,
private val responseAddress: SimpleString,
private val session: ClientSession,
private val requestProducer: ClientProducer,
private val addVerificationFuture: (Long, SettableFuture<Throwable?>) -> Unit,
private val executorService: ListeningScheduledExecutorService
) {
fun verifyTransaction(transaction: LedgerTransaction): ListenableFuture<Throwable?> {
val message = session.createMessage(false)
val verificationId = random63BitValue()
val request = VerifierApi.VerificationRequest(verificationId, transaction, responseAddress)
request.writeToClientMessage(message)
val verificationFuture = SettableFuture.create<Throwable?>()
addVerificationFuture(verificationId, verificationFuture)
requestProducer.send(message)
return verificationFuture
}
fun waitUntilNumberOfVerifiers(number: Int) {
poll(executorService, "$number verifiers to come online") {
if (session.queueQuery(SimpleString(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount >= number) {
Unit
} else {
null
}
}.get()
}
}
data class VerifierDriverDSL(
val driverDSL: DriverDSL
) : DriverDSLInternalInterface by driverDSL, VerifierInternalDSLInterface {
val verifierCount = AtomicInteger(0)
companion object {
private val log = loggerFor<VerifierDriverDSL>()
fun createConfiguration(baseDirectory: Path, nodeHostAndPort: HostAndPort): Config {
return ConfigFactory.parseMap(
mapOf(
"baseDirectory" to baseDirectory.toString(),
"nodeHostAndPort" to nodeHostAndPort.toString()
)
)
}
fun createVerificationRequestorArtemisConfig(baseDirectory: Path, responseAddress: String, hostAndPort: HostAndPort, sslConfiguration: SSLConfiguration): Configuration {
val connectionDirection = ConnectionDirection.Inbound(acceptorFactoryClassName = NettyAcceptorFactory::class.java.name)
return ConfigurationImpl().apply {
val artemisDir = "$baseDirectory/artemis"
bindingsDirectory = "$artemisDir/bindings"
journalDirectory = "$artemisDir/journal"
largeMessagesDirectory = "$artemisDir/large-messages"
acceptorConfigurations = setOf(ArtemisTcpTransport.tcpTransport(connectionDirection, hostAndPort, sslConfiguration))
queueConfigurations = listOf(
CoreQueueConfiguration().apply {
name = VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
address = VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
isDurable = false
},
CoreQueueConfiguration().apply {
name = responseAddress
address = responseAddress
isDurable = false
}
)
}
}
}
override fun startVerificationRequestor(name: String): ListenableFuture<VerificationRequestorHandle> {
val hostAndPort = driverDSL.portAllocation.nextHostAndPort()
return driverDSL.executorService.submit<VerificationRequestorHandle> {
startVerificationRequestorInternal(name, hostAndPort)
}
}
private fun startVerificationRequestorInternal(name: String, hostAndPort: HostAndPort): VerificationRequestorHandle {
val baseDir = driverDSL.driverDirectory / name
val sslConfig = object : SSLConfiguration {
override val certificatesDirectory = baseDir / "certificates"
override val keyStorePassword: String get() = "cordacadevpass"
override val trustStorePassword: String get() = "trustpass"
}
sslConfig.configureDevKeyAndTrustStores(name)
val responseQueueNonce = random63BitValue()
val responseAddress = "${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.$responseQueueNonce"
val artemisConfig = createVerificationRequestorArtemisConfig(baseDir, responseAddress, hostAndPort, sslConfig)
val securityManager = object : ActiveMQSecurityManager {
// We don't need auth, SSL is good enough
override fun validateUser(user: String?, password: String?) = true
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?) = true
}
val server = ActiveMQServerImpl(artemisConfig, securityManager)
log.info("Starting verification requestor Artemis server with base dir $baseDir")
server.start()
driverDSL.shutdownManager.registerShutdown(Futures.immediateFuture {
server.stop()
})
val locator = ActiveMQClient.createServerLocatorWithoutHA()
val transport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfig)
val sessionFactory = locator.createSessionFactory(transport)
val session = sessionFactory.createSession()
driverDSL.shutdownManager.registerShutdown(Futures.immediateFuture {
session.stop()
sessionFactory.close()
})
val producer = session.createProducer(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)
val consumer = session.createConsumer(responseAddress)
// We demux the individual txs ourselves to avoid race when a new verifier is added
val verificationResponseFutures = ConcurrentHashMap<Long, SettableFuture<Throwable?>>()
consumer.setMessageHandler {
val result = VerifierApi.VerificationResponse.fromClientMessage(it)
val resultFuture = verificationResponseFutures.remove(result.verificationId)
log.info("${verificationResponseFutures.size} verifications left")
if (resultFuture != null) {
resultFuture.set(result.exception)
} else {
log.warn("Verification requestor $name can't find tx result future with id ${result.verificationId}, possible dupe")
}
}
session.start()
return VerificationRequestorHandle(
p2pAddress = hostAndPort,
responseAddress = SimpleString(responseAddress),
session = session,
requestProducer = producer,
addVerificationFuture = { verificationNonce, future ->
verificationResponseFutures.put(verificationNonce, future)
},
executorService = driverDSL.executorService
)
}
override fun startVerifier(address: HostAndPort): ListenableFuture<VerifierHandle> {
log.info("Starting verifier connecting to address $address")
val id = verifierCount.andIncrement
val jdwpPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null
val processFuture = driverDSL.executorService.submit<Process> {
val verifierName = "verifier$id"
val baseDirectory = driverDSL.driverDirectory / verifierName
val config = createConfiguration(baseDirectory, address)
val configFilename = "verifier.conf"
writeConfig(baseDirectory, configFilename, config)
Verifier.loadConfiguration(baseDirectory, baseDirectory / configFilename).configureDevKeyAndTrustStores(verifierName)
ProcessUtilities.startJavaProcess<Verifier>(listOf(baseDirectory.toString()), jdwpPort = jdwpPort)
}
driverDSL.shutdownManager.registerProcessShutdown(processFuture)
return processFuture.map(::VerifierHandle)
}
private fun <A> NodeHandle.connectToNode(closure: (ClientSession) -> A): A {
val transport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), configuration.p2pAddress, configuration)
val locator = ActiveMQClient.createServerLocatorWithoutHA(transport)
val sessionFactory = locator.createSessionFactory()
val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, locator.ackBatchSize)
try {
return closure(session)
} finally {
session.close()
}
}
override fun NodeHandle.waitUntilNumberOfVerifiers(number: Int) {
connectToNode { session ->
poll(driverDSL.executorService, "$number verifiers to come online") {
if (session.queueQuery(SimpleString(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount >= number) {
Unit
} else {
null
}
}.get()
}
}
}

View File

@ -0,0 +1,125 @@
package net.corda.verifier
import com.google.common.util.concurrent.Futures
import net.corda.client.mock.generateOrFail
import net.corda.core.contracts.DOLLARS
import net.corda.core.map
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.services.config.VerifierType
import net.corda.node.services.transactions.ValidatingNotaryService
import org.junit.Test
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
class VerifierTests {
private fun generateTransactions(number: Int): List<LedgerTransaction> {
var currentLedger = GeneratedLedger.empty
val transactions = ArrayList<WireTransaction>()
val random = SplittableRandom()
for (i in 0..number - 1) {
val (tx, ledger) = currentLedger.transactionGenerator.generateOrFail(random)
transactions.add(tx)
currentLedger = ledger
}
return transactions.map { currentLedger.resolveWireTransaction(it) }
}
@Test
fun `single verifier works with requestor`() {
verifierDriver(automaticallyStartNetworkMap = false) {
val aliceFuture = startVerificationRequestor("Alice")
val transactions = generateTransactions(100)
val alice = aliceFuture.get()
startVerifier(alice)
alice.waitUntilNumberOfVerifiers(1)
val results = Futures.allAsList(transactions.map { alice.verifyTransaction(it) }).get()
results.forEach {
if (it != null) {
throw it
}
}
}
}
@Test
fun `multiple verifiers work with requestor`() {
verifierDriver(automaticallyStartNetworkMap = false) {
val aliceFuture = startVerificationRequestor("Alice")
val transactions = generateTransactions(100)
val alice = aliceFuture.get()
val numberOfVerifiers = 4
for (i in 1..numberOfVerifiers) {
startVerifier(alice)
}
alice.waitUntilNumberOfVerifiers(numberOfVerifiers)
val results = Futures.allAsList(transactions.map { alice.verifyTransaction(it) }).get()
results.forEach {
if (it != null) {
throw it
}
}
}
}
@Test
fun `verification redistributes on verifier death`() {
verifierDriver(automaticallyStartNetworkMap = false) {
val aliceFuture = startVerificationRequestor("Alice")
val numberOfTransactions = 100
val transactions = generateTransactions(numberOfTransactions)
val alice = aliceFuture.get()
val verifier1 = startVerifier(alice)
val verifier2 = startVerifier(alice)
val verifier3 = startVerifier(alice)
alice.waitUntilNumberOfVerifiers(3)
val remainingTransactionsCount = AtomicInteger(numberOfTransactions)
val futures = transactions.map { transaction ->
val future = alice.verifyTransaction(transaction)
// Kill verifiers as results are coming in, forcing artemis to redistribute.
future.map {
val remaining = remainingTransactionsCount.decrementAndGet()
when (remaining) {
33 -> verifier1.get().process.destroy()
66 -> verifier2.get().process.destroy()
}
it
}
}
Futures.allAsList(futures).get()
}
}
@Test
fun `verification request waits until verifier comes online`() {
verifierDriver(automaticallyStartNetworkMap = false) {
val aliceFuture = startVerificationRequestor("Alice")
val transactions = generateTransactions(100)
val alice = aliceFuture.get()
val futures = transactions.map { alice.verifyTransaction(it) }
startVerifier(alice)
Futures.allAsList(futures).get()
}
}
@Test
fun `single verifier works with a node`() {
verifierDriver {
val aliceFuture = startNode("Alice")
val notaryFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type)), verifierType = VerifierType.OutOfProcess)
val alice = aliceFuture.get()
val notary = notaryFuture.get()
startVerifier(notary)
alice.rpc.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, notaryFuture.get().nodeInfo.notaryIdentity).returnValue.get()
notary.waitUntilNumberOfVerifiers(1)
for (i in 1..10) {
alice.rpc.startFlow(::CashPaymentFlow, 10.DOLLARS, alice.nodeInfo.legalIdentity).returnValue.get()
}
}
}
}

View File

@ -0,0 +1,81 @@
package net.corda.verifier
import com.google.common.net.HostAndPort
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import net.corda.core.ErrorOr
import net.corda.core.div
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.getValue
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import java.nio.file.Path
import java.nio.file.Paths
data class VerifierConfiguration(
val baseDirectory: Path,
val config: Config
) : SSLConfiguration {
val nodeHostAndPort: HostAndPort by config
override val keyStorePassword: String by config
override val trustStorePassword: String by config
override val certificatesDirectory = baseDirectory / "certificates"
}
class Verifier {
companion object {
private val log = loggerFor<Verifier>()
fun loadConfiguration(baseDirectory: Path, configPath: Path): VerifierConfiguration {
val defaultConfig = ConfigFactory.parseResources("verifier-reference.conf", ConfigParseOptions.defaults().setAllowMissing(false))
val customConfig = ConfigFactory.parseFile(configPath.toFile(), ConfigParseOptions.defaults().setAllowMissing(false))
val resolvedConfig = customConfig.withFallback(defaultConfig).resolve()
return VerifierConfiguration(baseDirectory, resolvedConfig)
}
@JvmStatic
fun main(args: Array<String>) {
require(args.isNotEmpty()) { "Usage: <binary> BASE_DIR_CONTAINING_VERIFIER_CONF" }
val baseDirectory = Paths.get(args[0])
val verifierConfig = loadConfiguration(baseDirectory, baseDirectory / "verifier.conf")
val locator = ActiveMQClient.createServerLocatorWithHA(
tcpTransport(ConnectionDirection.Outbound(), verifierConfig.nodeHostAndPort, verifierConfig)
)
val sessionFactory = locator.createSessionFactory()
val session = sessionFactory.createSession(
VerifierApi.VERIFIER_USERNAME, VerifierApi.VERIFIER_USERNAME, false, true, true, locator.isPreAcknowledge, locator.ackBatchSize
)
Runtime.getRuntime().addShutdownHook(Thread {
log.info("Shutting down")
session.close()
sessionFactory.close()
})
val consumer = session.createConsumer(VERIFICATION_REQUESTS_QUEUE_NAME)
val replyProducer = session.createProducer()
consumer.setMessageHandler {
val request = VerifierApi.VerificationRequest.fromClientMessage(it)
log.debug { "Received verification request with id ${request.verificationId}" }
val result = ErrorOr.catch {
request.transaction.verify()
}
if (result.error != null) {
log.debug { "Verification returned with error ${result.error}" }
}
val reply = session.createMessage(false)
val response = VerifierApi.VerificationResponse(request.verificationId, result.error)
response.writeToClientMessage(reply)
replyProducer.send(request.responseAddress, reply)
it.acknowledge()
}
session.start()
log.info("Verifier started")
Thread.sleep(Long.MAX_VALUE)
}
}
}

View File

@ -0,0 +1,3 @@
# nodeHostAndPort = "localhost:12345"
keyStorePassword = "cordacadevpass"
trustStorePassword = "trustpass"