mirror of
https://github.com/corda/corda.git
synced 2025-02-07 03:29:19 +00:00
Merge pull request #1073 from corda/mkit-os-ent-merge
OS -> ENT Merge e00c7706c3f52d0e338f83d169e60c99d623f1c5
This commit is contained in:
commit
2497b0b90a
@ -16,7 +16,6 @@ import javafx.beans.property.SimpleObjectProperty
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.Party
|
||||
@ -32,7 +31,6 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.seconds
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
import rx.subjects.PublishSubject
|
||||
@ -135,7 +133,7 @@ class NodeMonitorModel {
|
||||
}
|
||||
}
|
||||
|
||||
val stateMachines = performRpcReconnect(nodeHostAndPort, username, password)
|
||||
val stateMachines = performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = false)
|
||||
|
||||
// Extract the flow tracking stream
|
||||
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
|
||||
@ -156,9 +154,9 @@ class NodeMonitorModel {
|
||||
futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject)
|
||||
}
|
||||
|
||||
private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): List<StateMachineInfo> {
|
||||
private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): List<StateMachineInfo> {
|
||||
|
||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password, shouldRetry)
|
||||
val proxy = connection.proxy
|
||||
|
||||
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
||||
@ -176,7 +174,7 @@ class NodeMonitorModel {
|
||||
// force closing the connection to avoid propagation of notification to the server side.
|
||||
connection.forceClose()
|
||||
// Perform re-connect.
|
||||
performRpcReconnect(nodeHostAndPort, username, password)
|
||||
performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = true)
|
||||
})
|
||||
|
||||
retryableStateMachineUpdatesSubscription.set(subscription)
|
||||
@ -186,7 +184,7 @@ class NodeMonitorModel {
|
||||
return stateMachineInfos
|
||||
}
|
||||
|
||||
private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
||||
private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): CordaRPCConnection {
|
||||
|
||||
val retryInterval = 5.seconds
|
||||
|
||||
@ -205,21 +203,15 @@ class NodeMonitorModel {
|
||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||
_connection
|
||||
} catch (throwable: Throwable) {
|
||||
when (throwable) {
|
||||
is ActiveMQException, is RPCException -> {
|
||||
// Happens when:
|
||||
// * incorrect credentials provided;
|
||||
// * incorrect endpoint specified;
|
||||
// - no point to retry connecting.
|
||||
throw throwable
|
||||
}
|
||||
else -> {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
logger.info("Exception upon establishing connection: " + throwable.message)
|
||||
null
|
||||
}
|
||||
if (shouldRetry) {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
logger.info("Exception upon establishing connection: " + throwable.message)
|
||||
null
|
||||
} else {
|
||||
throw throwable
|
||||
}
|
||||
}
|
||||
|
||||
if (connection != null) {
|
||||
logger.info("Connection successfully established with: $nodeHostAndPort")
|
||||
return connection
|
||||
|
@ -6,6 +6,7 @@ release, see :doc:`upgrade-notes`.
|
||||
|
||||
Unreleased
|
||||
----------
|
||||
* Remove all references to the out-of-process transaction verification.
|
||||
|
||||
* Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code.
|
||||
|
||||
|
@ -90,7 +90,6 @@ The following modules are available but we do not commit to their stability or c
|
||||
* **net.corda.tools.explorer**: a GUI front-end for Corda
|
||||
* **net.corda.tools.graphs**: utilities to infer project dependencies
|
||||
* **net.corda.tools.loadtest**: Corda load tests
|
||||
* **net.corda.verifier**: allows out-of-node transaction verification, allowing verification to scale horizontally
|
||||
* **net.corda.webserver**: is a servlet container for CorDapps that export HTTP endpoints. This server is an RPC client of the node
|
||||
* **net.corda.sandbox-creator**: sandbox utilities
|
||||
* **net.corda.quasar.hook**: agent to hook into Quasar and provide types exclusion lists
|
||||
|
@ -23,5 +23,4 @@ The Corda repository comprises the following folders:
|
||||
mock network) implementation
|
||||
* **tools** contains the explorer which is a GUI front-end for Corda, and also the DemoBench which is a GUI tool that
|
||||
allows you to run Corda nodes locally for demonstrations
|
||||
* **verifier** allows out-of-node transaction verification, allowing verification to scale horizontally
|
||||
* **webserver** is a servlet container for CorDapps that export HTTP endpoints. This server is an RPC client of the node
|
@ -232,7 +232,7 @@ In general, the requirements outlined in this design are cross-cutting concerns
|
||||
* `FlowErrorAuditEvent` (unused)
|
||||
* `SystemAuditEvent` (unused)
|
||||
* Modules impacted
|
||||
* All modules packaged and shipped as part of a Corda distribution (as published to Artifactory / Maven): *core, node, node-api, node-driver, finance, confidential-identities, test-common, test-utils, verifier, webserver, jackson, jfx, mock, rpc*
|
||||
* All modules packaged and shipped as part of a Corda distribution (as published to Artifactory / Maven): *core, node, node-api, node-driver, finance, confidential-identities, test-common, test-utils, webserver, jackson, jfx, mock, rpc*
|
||||
|
||||
### Functional
|
||||
|
||||
@ -458,7 +458,6 @@ Corda subsystem components:
|
||||
| NotaryService | RaftNonValidatingNotaryService | as above |
|
||||
| NotaryService | BFTNonValidatingNotaryService | Logging coverage (info, debug) |
|
||||
| Doorman | DoormanServer (Enterprise only) | Some logging (info, warn, error), and use of `println` |
|
||||
| TransactionVerifierService | OutOfProcessTransactionVerifierService (Enterprise only) | some logging (info) |
|
||||
| | | |
|
||||
|
||||
Corda core flows:
|
||||
|
@ -1,26 +0,0 @@
|
||||
Out-of-process verification
|
||||
===========================
|
||||
|
||||
A Corda node does transaction verification through ``ServiceHub.transactionVerifierService``. This is by default an
|
||||
``InMemoryTransactionVerifierService`` which just verifies transactions in-process.
|
||||
|
||||
Corda may be configured to use out of process verification. Any number of verifiers may be started connecting to a node
|
||||
through the node's exposed artemis SSL port. The messaging layer takes care of load balancing.
|
||||
|
||||
.. note:: We plan to introduce kernel level sandboxing around the out of process verifiers as an additional line of
|
||||
defence in case of inner sandbox escapes.
|
||||
|
||||
To configure a node to use out of process verification specify the ``verifierType`` option in your node.conf:
|
||||
|
||||
.. literalinclude:: example-code/src/main/resources/example-out-of-process-verifier-node.conf
|
||||
:language: cfg
|
||||
|
||||
You can build a verifier jar using ``./gradlew verifier:standaloneJar``.
|
||||
|
||||
And run it with ``java -jar verifier/build/libs/corda-verifier.jar <PATH_TO_VERIFIER_BASE_DIR>``.
|
||||
|
||||
``PATH_TO_VERIFIER_BASE_DIR`` should contain a ``certificates`` folder akin to the one in a node directory, and a
|
||||
``verifier.conf`` containing the following:
|
||||
|
||||
.. literalinclude:: example-code/src/main/resources/example-verifier.conf
|
||||
:language: cfg
|
@ -1,72 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.nodeapi
|
||||
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.sequence
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.reader.MessageUtil
|
||||
|
||||
object VerifierApi {
|
||||
const val VERIFIER_USERNAME = "SystemUsers/Verifier"
|
||||
const val VERIFICATION_REQUESTS_QUEUE_NAME = "verifier.requests"
|
||||
const val VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX = "verifier.responses"
|
||||
private const val VERIFICATION_ID_FIELD_NAME = "id"
|
||||
private const val RESULT_EXCEPTION_FIELD_NAME = "result-exception"
|
||||
|
||||
data class VerificationRequest(
|
||||
val verificationId: Long,
|
||||
val transaction: LedgerTransaction,
|
||||
val responseAddress: SimpleString
|
||||
) {
|
||||
companion object {
|
||||
fun fromClientMessage(message: ClientMessage): ObjectWithCompatibleContext<VerificationRequest> {
|
||||
val bytes = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
|
||||
val bytesSequence = bytes.sequence()
|
||||
val (transaction, context) = bytesSequence.deserializeWithCompatibleContext<LedgerTransaction>()
|
||||
val request = VerificationRequest(
|
||||
message.getLongProperty(VERIFICATION_ID_FIELD_NAME),
|
||||
transaction,
|
||||
MessageUtil.getJMSReplyTo(message))
|
||||
return ObjectWithCompatibleContext(request, context)
|
||||
}
|
||||
}
|
||||
|
||||
fun writeToClientMessage(message: ClientMessage) {
|
||||
message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId)
|
||||
message.writeBodyBufferBytes(transaction.serialize().bytes)
|
||||
MessageUtil.setJMSReplyTo(message, responseAddress)
|
||||
}
|
||||
}
|
||||
|
||||
data class VerificationResponse(
|
||||
val verificationId: Long,
|
||||
val exception: Throwable?
|
||||
) {
|
||||
companion object {
|
||||
fun fromClientMessage(message: ClientMessage): VerificationResponse {
|
||||
return VerificationResponse(
|
||||
message.getLongProperty(VERIFICATION_ID_FIELD_NAME),
|
||||
message.getBytesProperty(RESULT_EXCEPTION_FIELD_NAME)?.deserialize()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fun writeToClientMessage(message: ClientMessage, context: SerializationContext) {
|
||||
message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId)
|
||||
if (exception != null) {
|
||||
message.putBytesProperty(RESULT_EXCEPTION_FIELD_NAME, exception.serialize(context = context).bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Binary file not shown.
Binary file not shown.
@ -0,0 +1,41 @@
|
||||
package net.corda.nodeapi.internal.crypto
|
||||
|
||||
import net.corda.core.internal.validate
|
||||
import net.corda.nodeapi.internal.DEV_CA_TRUST_STORE_FILE
|
||||
import net.corda.nodeapi.internal.DEV_CA_TRUST_STORE_PASS
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.security.cert.TrustAnchor
|
||||
import java.security.cert.X509Certificate
|
||||
|
||||
class DevCertificatesTest {
|
||||
private companion object {
|
||||
const val OLD_DEV_KEYSTORE_PASS = "password"
|
||||
const val OLD_NODE_DEV_KEYSTORE_FILE_NAME = "nodekeystore.jks"
|
||||
}
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val tempFolder = TemporaryFolder()
|
||||
|
||||
@Test
|
||||
fun `create server certificate in keystore for SSL`() {
|
||||
// given
|
||||
val newTrustStore = loadKeyStore(javaClass.classLoader.getResourceAsStream("certificates/$DEV_CA_TRUST_STORE_FILE"), DEV_CA_TRUST_STORE_PASS)
|
||||
val newTrustRoot = newTrustStore.getX509Certificate(X509Utilities.CORDA_ROOT_CA)
|
||||
val newTrustAnchor = TrustAnchor(newTrustRoot, null)
|
||||
|
||||
val oldNodeCaKeyStore = loadKeyStore(javaClass.classLoader.getResourceAsStream("regression-test/$OLD_NODE_DEV_KEYSTORE_FILE_NAME"), OLD_DEV_KEYSTORE_PASS)
|
||||
val oldX509Certificates = oldNodeCaKeyStore.getCertificateChain(X509Utilities.CORDA_CLIENT_CA).map {
|
||||
it as X509Certificate
|
||||
}.toTypedArray()
|
||||
|
||||
val certPath = X509Utilities.buildCertPath(*oldX509Certificates)
|
||||
|
||||
// when
|
||||
certPath.validate(newTrustAnchor)
|
||||
|
||||
// then no exception is thrown
|
||||
}
|
||||
}
|
BIN
node-api/src/test/resources/regression-test/nodekeystore.jks
Normal file
BIN
node-api/src/test/resources/regression-test/nodekeystore.jks
Normal file
Binary file not shown.
BIN
node-api/src/test/resources/regression-test/sslkeystore.jks
Normal file
BIN
node-api/src/test/resources/regression-test/sslkeystore.jks
Normal file
Binary file not shown.
@ -94,7 +94,7 @@ class CertificateRevocationListNodeTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Simple AMPQ Client to Server connection works`() {
|
||||
fun `Simple AMPQ Client to Server connection works and soft fail is enabled`() {
|
||||
val crlCheckSoftFail = true
|
||||
val (amqpServer, _) = createServer(serverPort, crlCheckSoftFail = crlCheckSoftFail)
|
||||
amqpServer.use {
|
||||
@ -126,7 +126,39 @@ class CertificateRevocationListNodeTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `AMPQ Client to Server connection fails when client's certificate is revoked`() {
|
||||
fun `Simple AMPQ Client to Server connection works and soft fail is disabled`() {
|
||||
val crlCheckSoftFail = false
|
||||
val (amqpServer, _) = createServer(serverPort, crlCheckSoftFail = crlCheckSoftFail)
|
||||
amqpServer.use {
|
||||
amqpServer.start()
|
||||
val receiveSubs = amqpServer.onReceive.subscribe {
|
||||
assertEquals(BOB_NAME.toString(), it.sourceLegalName)
|
||||
assertEquals(P2P_PREFIX + "Test", it.topic)
|
||||
assertEquals("Test", String(it.payload))
|
||||
it.complete(true)
|
||||
}
|
||||
val (amqpClient, _) = createClient(serverPort, crlCheckSoftFail)
|
||||
amqpClient.use {
|
||||
val serverConnected = amqpServer.onConnection.toFuture()
|
||||
val clientConnected = amqpClient.onConnection.toFuture()
|
||||
amqpClient.start()
|
||||
val serverConnect = serverConnected.get()
|
||||
assertEquals(true, serverConnect.connected)
|
||||
val clientConnect = clientConnected.get()
|
||||
assertEquals(true, clientConnect.connected)
|
||||
val msg = amqpClient.createMessage("Test".toByteArray(),
|
||||
P2P_PREFIX + "Test",
|
||||
ALICE_NAME.toString(),
|
||||
emptyMap())
|
||||
amqpClient.write(msg)
|
||||
assertEquals(MessageStatus.Acknowledged, msg.onComplete.get())
|
||||
receiveSubs.unsubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `AMPQ Client to Server connection fails when client's certificate is revoked and soft fail is enabled`() {
|
||||
val crlCheckSoftFail = true
|
||||
val (amqpServer, _) = createServer(serverPort, crlCheckSoftFail = crlCheckSoftFail)
|
||||
amqpServer.use {
|
||||
@ -146,6 +178,27 @@ class CertificateRevocationListNodeTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `AMPQ Client to Server connection fails when client's certificate is revoked and soft fail is disabled`() {
|
||||
val crlCheckSoftFail = false
|
||||
val (amqpServer, _) = createServer(serverPort, crlCheckSoftFail = crlCheckSoftFail)
|
||||
amqpServer.use {
|
||||
amqpServer.start()
|
||||
amqpServer.onReceive.subscribe {
|
||||
it.complete(true)
|
||||
}
|
||||
val (amqpClient, clientCert) = createClient(serverPort, crlCheckSoftFail)
|
||||
revokedNodeCerts.add(clientCert.serialNumber)
|
||||
amqpClient.use {
|
||||
val serverConnected = amqpServer.onConnection.toFuture()
|
||||
amqpClient.onConnection.toFuture()
|
||||
amqpClient.start()
|
||||
val serverConnect = serverConnected.get()
|
||||
assertEquals(false, serverConnect.connected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `AMPQ Client to Server connection fails when servers's certificate is revoked`() {
|
||||
val crlCheckSoftFail = true
|
||||
|
@ -54,7 +54,6 @@ import net.corda.node.services.messaging.InternalRPCMessagingClient
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.P2PMessagingClient
|
||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||
import net.corda.node.services.messaging.VerifierMessagingClient
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
@ -138,10 +137,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
override val log: Logger get() = staticLog
|
||||
override fun makeTransactionVerifierService(): TransactionVerifierService = when (configuration.verifierType) {
|
||||
VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //verifierMessagingClient!!.verifierService
|
||||
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
|
||||
}
|
||||
override fun makeTransactionVerifierService(): TransactionVerifierService = InMemoryTransactionVerifierService(numberOfWorkers = 4)
|
||||
|
||||
private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1"
|
||||
|
||||
@ -232,10 +228,6 @@ open class Node(configuration: NodeConfiguration,
|
||||
printBasicNodeInfo("RPC connection address", it.primary.toString())
|
||||
printBasicNodeInfo("RPC admin connection address", it.admin.toString())
|
||||
}
|
||||
verifierMessagingClient = when (configuration.verifierType) {
|
||||
VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
|
||||
VerifierType.InMemory -> null
|
||||
}
|
||||
require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
|
||||
val serviceIdentity: PublicKey? = if (info.legalIdentities.size == 1) null else info.legalIdentities[1].owningKey
|
||||
return P2PMessagingClient(
|
||||
@ -337,10 +329,6 @@ open class Node(configuration: NodeConfiguration,
|
||||
runOnStop += this::close
|
||||
init(rpcOps, securityManager)
|
||||
}
|
||||
verifierMessagingClient?.run {
|
||||
runOnStop += this::stop
|
||||
start()
|
||||
}
|
||||
(network as P2PMessagingClient).apply {
|
||||
runOnStop += this::stop
|
||||
start()
|
||||
@ -443,12 +431,10 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
private var internalRpcMessagingClient: InternalRPCMessagingClient? = null
|
||||
private var verifierMessagingClient: VerifierMessagingClient? = null
|
||||
|
||||
/** Starts a blocking event loop for message dispatch. */
|
||||
fun run() {
|
||||
internalRpcMessagingClient?.start(rpcBroker!!.serverControl)
|
||||
verifierMessagingClient?.start2()
|
||||
(network as P2PMessagingClient).run()
|
||||
}
|
||||
|
||||
|
@ -184,7 +184,7 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
|
||||
.asSequence()
|
||||
// This is to only scan classes from test folders.
|
||||
.filter { url ->
|
||||
listOf("main", "production/classes").none { url.toString().contains("$it/$resource") } || listOf("net.corda.core", "net.corda.node", "net.corda.finance").none { scanPackage.startsWith(it) }
|
||||
!url.toString().contains("main/$resource") || listOf("net.corda.core", "net.corda.node", "net.corda.finance").none { scanPackage.startsWith(it) }
|
||||
}
|
||||
.map { url ->
|
||||
if (url.protocol == "jar") {
|
||||
|
@ -455,8 +455,7 @@ data class NodeH2Settings(
|
||||
)
|
||||
|
||||
enum class VerifierType {
|
||||
InMemory,
|
||||
OutOfProcess
|
||||
InMemory
|
||||
}
|
||||
|
||||
enum class CertChainPolicyType {
|
||||
|
@ -1,84 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry, private val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<VerifierMessagingClient>()
|
||||
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
|
||||
}
|
||||
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
|
||||
/** An executor for sending messages */
|
||||
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
|
||||
private var verificationResponseConsumer: ClientConsumer? = null
|
||||
fun start(): Unit = synchronized(this) {
|
||||
val session = artemis.start().session
|
||||
fun checkVerifierCount() {
|
||||
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
|
||||
log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!")
|
||||
}
|
||||
}
|
||||
|
||||
// Attempts to create a durable queue on the broker which is bound to an address of the same name.
|
||||
fun createQueueIfAbsent(queueName: String) {
|
||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||
if (!queueQuery.isExists) {
|
||||
log.info("Create fresh queue $queueName bound on same address")
|
||||
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
|
||||
}
|
||||
}
|
||||
createQueueIfAbsent(VERIFICATION_REQUESTS_QUEUE_NAME)
|
||||
createQueueIfAbsent(verifierResponseAddress)
|
||||
verificationResponseConsumer = session.createConsumer(verifierResponseAddress)
|
||||
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
fun start2() = synchronized(this) {
|
||||
verifierService.start(verificationResponseConsumer!!)
|
||||
}
|
||||
|
||||
fun stop() = synchronized(this) {
|
||||
artemis.stop()
|
||||
}
|
||||
|
||||
internal val verifierService = OutOfProcessTransactionVerifierService(metrics) { nonce, transaction ->
|
||||
messagingExecutor.fetchFrom {
|
||||
sendRequest(nonce, transaction)
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendRequest(nonce: Long, transaction: LedgerTransaction) = synchronized(this) {
|
||||
val started = artemis.started!!
|
||||
val message = started.session.createMessage(false)
|
||||
val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress))
|
||||
request.writeToClientMessage(message)
|
||||
started.producer.send(VERIFICATION_REQUESTS_QUEUE_NAME, message)
|
||||
}
|
||||
}
|
@ -1,81 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.codahale.metrics.Timer
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class OutOfProcessTransactionVerifierService(
|
||||
private val metrics: MetricRegistry,
|
||||
private val sendRequest: (Long, LedgerTransaction) -> Unit
|
||||
) : SingletonSerializeAsToken(), TransactionVerifierService {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private data class VerificationHandle(
|
||||
val transactionId: SecureHash,
|
||||
val resultFuture: OpenFuture<Unit>,
|
||||
val durationTimerContext: Timer.Context
|
||||
)
|
||||
|
||||
private val verificationHandles = ConcurrentHashMap<Long, VerificationHandle>()
|
||||
|
||||
// Metrics
|
||||
private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name"
|
||||
|
||||
private val durationTimer = metrics.timer(metric("Verification.Duration"))
|
||||
private val successMeter = metrics.meter(metric("Verification.Success"))
|
||||
private val failureMeter = metrics.meter(metric("Verification.Failure"))
|
||||
|
||||
class VerificationResultForUnknownTransaction(nonce: Long) :
|
||||
Exception("Verification result arrived for unknown transaction nonce $nonce")
|
||||
|
||||
fun start(responseConsumer: ClientConsumer) {
|
||||
log.info("Starting out of process verification service")
|
||||
metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size })
|
||||
responseConsumer.setMessageHandler { message ->
|
||||
val response = VerifierApi.VerificationResponse.fromClientMessage(message)
|
||||
val handle = verificationHandles.remove(response.verificationId) ?: throw VerificationResultForUnknownTransaction(response.verificationId)
|
||||
handle.durationTimerContext.stop()
|
||||
val exception = response.exception
|
||||
if (exception == null) {
|
||||
successMeter.mark()
|
||||
handle.resultFuture.set(Unit)
|
||||
} else {
|
||||
failureMeter.mark()
|
||||
handle.resultFuture.setException(exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun verify(transaction: LedgerTransaction): CordaFuture<*> {
|
||||
log.info("Verifying ${transaction.id}")
|
||||
val future = openFuture<Unit>()
|
||||
val nonce = random63BitValue()
|
||||
verificationHandles[nonce] = VerificationHandle(transaction.id, future, durationTimer.time())
|
||||
sendRequest(nonce, transaction)
|
||||
return future
|
||||
}
|
||||
}
|
@ -12,21 +12,50 @@ package net.corda.node.services.vault
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.FungibleAsset
|
||||
import net.corda.core.contracts.OwnableState
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.ServicesForResolution
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.node.services.StatesNotAvailableException
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultQueryException
|
||||
import net.corda.core.node.services.queryBy
|
||||
import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
|
||||
import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE
|
||||
import net.corda.core.node.services.vault.MAX_PAGE_SIZE
|
||||
import net.corda.core.node.services.vault.PageSpecification
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.Sort
|
||||
import net.corda.core.node.services.vault.SortAttribute
|
||||
import net.corda.core.node.services.vault.builder
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.*
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.core.transactions.ContractUpgradeWireTransaction
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.FullTransaction
|
||||
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.toHexString
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.VaultServiceInternal
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.nodeapi.internal.persistence.*
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||
import org.hibernate.Session
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
@ -142,7 +171,8 @@ class NodeVaultService(
|
||||
val ourNewStates = when (statesToRecord) {
|
||||
StatesToRecord.NONE -> throw AssertionError("Should not reach here")
|
||||
StatesToRecord.ONLY_RELEVANT -> tx.outputs.withIndex().filter {
|
||||
isRelevant(it.value.data, keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } }).toSet()) }
|
||||
isRelevant(it.value.data, keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } }).toSet())
|
||||
}
|
||||
StatesToRecord.ALL_VISIBLE -> tx.outputs.withIndex()
|
||||
}.map { tx.outRef<ContractState>(it.index) }
|
||||
|
||||
@ -168,16 +198,13 @@ class NodeVaultService(
|
||||
else -> throw IllegalArgumentException("Unsupported transaction type: ${tx.javaClass.name}")
|
||||
}
|
||||
val myKeys by lazy { keyManagementService.filterMyKeys(ltx.outputs.flatMap { it.data.participants.map { it.owningKey } }) }
|
||||
val (consumedStateAndRefs, producedStates) = ltx.inputs.
|
||||
zip(ltx.outputs).
|
||||
filter { (_, output) ->
|
||||
if (statesToRecord == StatesToRecord.ONLY_RELEVANT) {
|
||||
isRelevant(output.data, myKeys.toSet())
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}.
|
||||
unzip()
|
||||
val (consumedStateAndRefs, producedStates) = ltx.inputs.zip(ltx.outputs).filter { (_, output) ->
|
||||
if (statesToRecord == StatesToRecord.ONLY_RELEVANT) {
|
||||
isRelevant(output.data, myKeys.toSet())
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}.unzip()
|
||||
|
||||
val producedStateAndRefs = producedStates.map { ltx.outRef<ContractState>(it.data) }
|
||||
if (consumedStateAndRefs.isEmpty() && producedStateAndRefs.isEmpty()) {
|
||||
@ -403,7 +430,7 @@ class NodeVaultService(
|
||||
if (!seen) {
|
||||
val contractInterfaces = deriveContractInterfaces(concreteType)
|
||||
contractInterfaces.map {
|
||||
val contractInterface = contractStateTypeMappings.getOrPut(it.name, { mutableSetOf() })
|
||||
val contractInterface = contractStateTypeMappings.getOrPut(it.name) { mutableSetOf() }
|
||||
contractInterface.add(concreteType.name)
|
||||
}
|
||||
}
|
||||
@ -471,7 +498,7 @@ class NodeVaultService(
|
||||
if (!paging.isDefault && index == paging.pageSize) // skip last result if paged
|
||||
return@forEachIndexed
|
||||
val vaultState = result[0] as VaultSchemaV1.VaultStates
|
||||
val stateRef = StateRef(SecureHash.parse(vaultState.stateRef!!.txId!!), vaultState.stateRef!!.index!!)
|
||||
val stateRef = StateRef(SecureHash.parse(vaultState.stateRef!!.txId), vaultState.stateRef!!.index)
|
||||
stateRefs.add(stateRef)
|
||||
statesMeta.add(Vault.StateMetadata(stateRef,
|
||||
vaultState.contractStateClassName,
|
||||
@ -529,13 +556,24 @@ class NodeVaultService(
|
||||
val distinctTypes = results.map { it }
|
||||
|
||||
val contractInterfaceToConcreteTypes = mutableMapOf<String, MutableSet<String>>()
|
||||
val unknownTypes = mutableSetOf<String>()
|
||||
distinctTypes.forEach { type ->
|
||||
val concreteType: Class<ContractState> = uncheckedCast(Class.forName(type))
|
||||
val contractInterfaces = deriveContractInterfaces(concreteType)
|
||||
contractInterfaces.map {
|
||||
val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name, { mutableSetOf() })
|
||||
contractInterface.add(concreteType.name)
|
||||
val concreteType: Class<ContractState>? = try {
|
||||
uncheckedCast(Class.forName(type))
|
||||
} catch (e: ClassNotFoundException) {
|
||||
unknownTypes += type
|
||||
null
|
||||
}
|
||||
concreteType?.let {
|
||||
val contractInterfaces = deriveContractInterfaces(it)
|
||||
contractInterfaces.map {
|
||||
val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name) { mutableSetOf() }
|
||||
contractInterface.add(it.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (unknownTypes.isNotEmpty()) {
|
||||
log.warn("There are unknown contract state types in the vault, which will prevent these states from being used. The relevant CorDapps must be loaded for these states to be used. The types not on the classpath are ${unknownTypes.joinToString(", ", "[", "]")}.")
|
||||
}
|
||||
return contractInterfaceToConcreteTypes
|
||||
}
|
||||
|
@ -25,12 +25,10 @@ import net.corda.core.flows.FlowException
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.AbstractAttachment
|
||||
import net.corda.core.internal.x500Name
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA
|
||||
import net.corda.nodeapi.internal.crypto.ContentSignerBuilder
|
||||
import net.corda.serialization.internal.*
|
||||
import net.corda.serialization.internal.amqp.SerializerFactory.Companion.isPrimitive
|
||||
@ -45,6 +43,7 @@ import org.apache.qpid.proton.amqp.*
|
||||
import org.apache.qpid.proton.codec.DecoderImpl
|
||||
import org.apache.qpid.proton.codec.EncoderImpl
|
||||
import org.assertj.core.api.Assertions.*
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.bouncycastle.cert.X509v2CRLBuilder
|
||||
import org.bouncycastle.cert.jcajce.JcaX509CRLConverter
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
@ -667,8 +666,8 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
val scheme = AMQPServerSerializationScheme(emptyList())
|
||||
val func = scheme::class.superclasses.single { it.simpleName == "AbstractAMQPSerializationScheme" }
|
||||
.java.getDeclaredMethod("registerCustomSerializers",
|
||||
SerializationContext::class.java,
|
||||
SerializerFactory::class.java)
|
||||
SerializationContext::class.java,
|
||||
SerializerFactory::class.java)
|
||||
func.isAccessible = true
|
||||
|
||||
val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
@ -1021,7 +1020,7 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
}
|
||||
|
||||
private fun emptyCrl(): X509CRL {
|
||||
val builder = X509v2CRLBuilder(CordaX500Name.build(DEV_INTERMEDIATE_CA.certificate.issuerX500Principal).x500Name, Date())
|
||||
val builder = X509v2CRLBuilder(X500Name("CN=Corda Root CA, O=R3 HoldCo LLC, L=New York, C=US"), Date())
|
||||
val provider = BouncyCastleProvider()
|
||||
val crlHolder = builder.build(ContentSignerBuilder.build(Crypto.RSA_SHA256, Crypto.generateKeyPair(Crypto.RSA_SHA256).private, provider))
|
||||
return JcaX509CRLConverter().setProvider(provider).getCRL(crlHolder)
|
||||
@ -1330,12 +1329,12 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
}
|
||||
|
||||
interface DataClassByInterface<V> {
|
||||
val v : V
|
||||
val v: V
|
||||
}
|
||||
|
||||
@Test
|
||||
fun dataClassBy() {
|
||||
data class C (val s: String) : DataClassByInterface<String> {
|
||||
data class C(val s: String) : DataClassByInterface<String> {
|
||||
override val v: String = "-- $s"
|
||||
}
|
||||
|
||||
@ -1349,8 +1348,8 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
|
||||
try {
|
||||
val i2 = DeserializationInput(testDefaultFactory()).deserialize(bytes)
|
||||
} catch (e : NotSerializableException) {
|
||||
throw Error ("Deserializing serialized \$C should not throw")
|
||||
} catch (e: NotSerializableException) {
|
||||
throw Error("Deserializing serialized \$C should not throw")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,8 +21,7 @@ import net.corda.testing.node.NotarySpec
|
||||
import java.nio.file.Path
|
||||
|
||||
enum class VerifierType {
|
||||
InMemory,
|
||||
OutOfProcess
|
||||
InMemory
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user