mirror of
https://github.com/corda/corda.git
synced 2025-04-07 11:27:01 +00:00
CORDA-1567 Remove all traces of the out-of-process verifier (#3424)
This commit is contained in:
parent
34e52551fd
commit
9be4c5dca4
@ -6,6 +6,7 @@ release, see :doc:`upgrade-notes`.
|
||||
|
||||
Unreleased
|
||||
----------
|
||||
* Remove all references to the out-of-process transaction verification.
|
||||
|
||||
* The class carpenter has a "lenient" mode where it will, during deserialisation, happily synthesis classes that implement
|
||||
interfaces that will have unimplemented methods. This is useful, for example, for object viewers. This can be turned on
|
||||
|
@ -90,7 +90,6 @@ The following modules are available but we do not commit to their stability or c
|
||||
* **net.corda.tools.explorer**: a GUI front-end for Corda
|
||||
* **net.corda.tools.graphs**: utilities to infer project dependencies
|
||||
* **net.corda.tools.loadtest**: Corda load tests
|
||||
* **net.corda.verifier**: allows out-of-node transaction verification, allowing verification to scale horizontally
|
||||
* **net.corda.webserver**: is a servlet container for CorDapps that export HTTP endpoints. This server is an RPC client of the node
|
||||
* **net.corda.sandbox-creator**: sandbox utilities
|
||||
* **net.corda.quasar.hook**: agent to hook into Quasar and provide types exclusion lists
|
||||
|
@ -23,5 +23,4 @@ The Corda repository comprises the following folders:
|
||||
mock network) implementation
|
||||
* **tools** contains the explorer which is a GUI front-end for Corda, and also the DemoBench which is a GUI tool that
|
||||
allows you to run Corda nodes locally for demonstrations
|
||||
* **verifier** allows out-of-node transaction verification, allowing verification to scale horizontally
|
||||
* **webserver** is a servlet container for CorDapps that export HTTP endpoints. This server is an RPC client of the node
|
@ -232,7 +232,7 @@ In general, the requirements outlined in this design are cross-cutting concerns
|
||||
* `FlowErrorAuditEvent` (unused)
|
||||
* `SystemAuditEvent` (unused)
|
||||
* Modules impacted
|
||||
* All modules packaged and shipped as part of a Corda distribution (as published to Artifactory / Maven): *core, node, node-api, node-driver, finance, confidential-identities, test-common, test-utils, verifier, webserver, jackson, jfx, mock, rpc*
|
||||
* All modules packaged and shipped as part of a Corda distribution (as published to Artifactory / Maven): *core, node, node-api, node-driver, finance, confidential-identities, test-common, test-utils, webserver, jackson, jfx, mock, rpc*
|
||||
|
||||
### Functional
|
||||
|
||||
@ -458,7 +458,6 @@ Corda subsystem components:
|
||||
| NotaryService | RaftNonValidatingNotaryService | as above |
|
||||
| NotaryService | BFTNonValidatingNotaryService | Logging coverage (info, debug) |
|
||||
| Doorman | DoormanServer (Enterprise only) | Some logging (info, warn, error), and use of `println` |
|
||||
| TransactionVerifierService | OutOfProcessTransactionVerifierService (Enterprise only) | some logging (info) |
|
||||
| | | |
|
||||
|
||||
Corda core flows:
|
||||
|
@ -1,26 +0,0 @@
|
||||
Out-of-process verification
|
||||
===========================
|
||||
|
||||
A Corda node does transaction verification through ``ServiceHub.transactionVerifierService``. This is by default an
|
||||
``InMemoryTransactionVerifierService`` which just verifies transactions in-process.
|
||||
|
||||
Corda may be configured to use out of process verification. Any number of verifiers may be started connecting to a node
|
||||
through the node's exposed artemis SSL port. The messaging layer takes care of load balancing.
|
||||
|
||||
.. note:: We plan to introduce kernel level sandboxing around the out of process verifiers as an additional line of
|
||||
defence in case of inner sandbox escapes.
|
||||
|
||||
To configure a node to use out of process verification specify the ``verifierType`` option in your node.conf:
|
||||
|
||||
.. literalinclude:: example-code/src/main/resources/example-out-of-process-verifier-node.conf
|
||||
:language: cfg
|
||||
|
||||
You can build a verifier jar using ``./gradlew verifier:standaloneJar``.
|
||||
|
||||
And run it with ``java -jar verifier/build/libs/corda-verifier.jar <PATH_TO_VERIFIER_BASE_DIR>``.
|
||||
|
||||
``PATH_TO_VERIFIER_BASE_DIR`` should contain a ``certificates`` folder akin to the one in a node directory, and a
|
||||
``verifier.conf`` containing the following:
|
||||
|
||||
.. literalinclude:: example-code/src/main/resources/example-verifier.conf
|
||||
:language: cfg
|
@ -1,62 +0,0 @@
|
||||
package net.corda.nodeapi
|
||||
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.sequence
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.reader.MessageUtil
|
||||
|
||||
object VerifierApi {
|
||||
const val VERIFIER_USERNAME = "SystemUsers/Verifier"
|
||||
const val VERIFICATION_REQUESTS_QUEUE_NAME = "verifier.requests"
|
||||
const val VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX = "verifier.responses"
|
||||
private const val VERIFICATION_ID_FIELD_NAME = "id"
|
||||
private const val RESULT_EXCEPTION_FIELD_NAME = "result-exception"
|
||||
|
||||
data class VerificationRequest(
|
||||
val verificationId: Long,
|
||||
val transaction: LedgerTransaction,
|
||||
val responseAddress: SimpleString
|
||||
) {
|
||||
companion object {
|
||||
fun fromClientMessage(message: ClientMessage): ObjectWithCompatibleContext<VerificationRequest> {
|
||||
val bytes = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
|
||||
val bytesSequence = bytes.sequence()
|
||||
val (transaction, context) = bytesSequence.deserializeWithCompatibleContext<LedgerTransaction>()
|
||||
val request = VerificationRequest(
|
||||
message.getLongProperty(VERIFICATION_ID_FIELD_NAME),
|
||||
transaction,
|
||||
MessageUtil.getJMSReplyTo(message))
|
||||
return ObjectWithCompatibleContext(request, context)
|
||||
}
|
||||
}
|
||||
|
||||
fun writeToClientMessage(message: ClientMessage) {
|
||||
message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId)
|
||||
message.writeBodyBufferBytes(transaction.serialize().bytes)
|
||||
MessageUtil.setJMSReplyTo(message, responseAddress)
|
||||
}
|
||||
}
|
||||
|
||||
data class VerificationResponse(
|
||||
val verificationId: Long,
|
||||
val exception: Throwable?
|
||||
) {
|
||||
companion object {
|
||||
fun fromClientMessage(message: ClientMessage): VerificationResponse {
|
||||
return VerificationResponse(
|
||||
message.getLongProperty(VERIFICATION_ID_FIELD_NAME),
|
||||
message.getBytesProperty(RESULT_EXCEPTION_FIELD_NAME)?.deserialize()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fun writeToClientMessage(message: ClientMessage, context: SerializationContext) {
|
||||
message.putLongProperty(VERIFICATION_ID_FIELD_NAME, verificationId)
|
||||
if (exception != null) {
|
||||
message.putBytesProperty(RESULT_EXCEPTION_FIELD_NAME, exception.serialize(context = context).bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -44,7 +44,6 @@ import net.corda.node.services.messaging.InternalRPCMessagingClient
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.P2PMessagingClient
|
||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||
import net.corda.node.services.messaging.VerifierMessagingClient
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
@ -125,10 +124,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
override val log: Logger get() = staticLog
|
||||
override fun makeTransactionVerifierService(): TransactionVerifierService = when (configuration.verifierType) {
|
||||
VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //verifierMessagingClient!!.verifierService
|
||||
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
|
||||
}
|
||||
override fun makeTransactionVerifierService(): TransactionVerifierService = InMemoryTransactionVerifierService(numberOfWorkers = 4)
|
||||
|
||||
private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1"
|
||||
|
||||
@ -212,10 +208,6 @@ open class Node(configuration: NodeConfiguration,
|
||||
printBasicNodeInfo("RPC connection address", it.primary.toString())
|
||||
printBasicNodeInfo("RPC admin connection address", it.admin.toString())
|
||||
}
|
||||
verifierMessagingClient = when (configuration.verifierType) {
|
||||
VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
|
||||
VerifierType.InMemory -> null
|
||||
}
|
||||
require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
|
||||
val serviceIdentity: PublicKey? = if (info.legalIdentities.size == 1) null else info.legalIdentities[1].owningKey
|
||||
return P2PMessagingClient(
|
||||
@ -311,10 +303,6 @@ open class Node(configuration: NodeConfiguration,
|
||||
runOnStop += this::close
|
||||
init(rpcOps, securityManager)
|
||||
}
|
||||
verifierMessagingClient?.run {
|
||||
runOnStop += this::stop
|
||||
start()
|
||||
}
|
||||
(network as P2PMessagingClient).apply {
|
||||
runOnStop += this::stop
|
||||
start()
|
||||
@ -414,11 +402,10 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
private var internalRpcMessagingClient: InternalRPCMessagingClient? = null
|
||||
private var verifierMessagingClient: VerifierMessagingClient? = null
|
||||
|
||||
/** Starts a blocking event loop for message dispatch. */
|
||||
fun run() {
|
||||
internalRpcMessagingClient?.start(rpcBroker!!.serverControl)
|
||||
verifierMessagingClient?.start2()
|
||||
(network as P2PMessagingClient).run()
|
||||
}
|
||||
|
||||
|
@ -371,8 +371,7 @@ data class NodeH2Settings(
|
||||
)
|
||||
|
||||
enum class VerifierType {
|
||||
InMemory,
|
||||
OutOfProcess
|
||||
InMemory
|
||||
}
|
||||
|
||||
enum class CertChainPolicyType {
|
||||
|
@ -1,74 +0,0 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry, private val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<VerifierMessagingClient>()
|
||||
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
|
||||
}
|
||||
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
|
||||
/** An executor for sending messages */
|
||||
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
|
||||
private var verificationResponseConsumer: ClientConsumer? = null
|
||||
fun start(): Unit = synchronized(this) {
|
||||
val session = artemis.start().session
|
||||
fun checkVerifierCount() {
|
||||
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
|
||||
log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!")
|
||||
}
|
||||
}
|
||||
|
||||
// Attempts to create a durable queue on the broker which is bound to an address of the same name.
|
||||
fun createQueueIfAbsent(queueName: String) {
|
||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||
if (!queueQuery.isExists) {
|
||||
log.info("Create fresh queue $queueName bound on same address")
|
||||
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
|
||||
}
|
||||
}
|
||||
createQueueIfAbsent(VERIFICATION_REQUESTS_QUEUE_NAME)
|
||||
createQueueIfAbsent(verifierResponseAddress)
|
||||
verificationResponseConsumer = session.createConsumer(verifierResponseAddress)
|
||||
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
fun start2() = synchronized(this) {
|
||||
verifierService.start(verificationResponseConsumer!!)
|
||||
}
|
||||
|
||||
fun stop() = synchronized(this) {
|
||||
artemis.stop()
|
||||
}
|
||||
|
||||
internal val verifierService = OutOfProcessTransactionVerifierService(metrics) { nonce, transaction ->
|
||||
messagingExecutor.fetchFrom {
|
||||
sendRequest(nonce, transaction)
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendRequest(nonce: Long, transaction: LedgerTransaction) = synchronized(this) {
|
||||
val started = artemis.started!!
|
||||
val message = started.session.createMessage(false)
|
||||
val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress))
|
||||
request.writeToClientMessage(message)
|
||||
started.producer.send(VERIFICATION_REQUESTS_QUEUE_NAME, message)
|
||||
}
|
||||
}
|
@ -1,71 +0,0 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.codahale.metrics.Timer
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class OutOfProcessTransactionVerifierService(
|
||||
private val metrics: MetricRegistry,
|
||||
private val sendRequest: (Long, LedgerTransaction) -> Unit
|
||||
) : SingletonSerializeAsToken(), TransactionVerifierService {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private data class VerificationHandle(
|
||||
val transactionId: SecureHash,
|
||||
val resultFuture: OpenFuture<Unit>,
|
||||
val durationTimerContext: Timer.Context
|
||||
)
|
||||
|
||||
private val verificationHandles = ConcurrentHashMap<Long, VerificationHandle>()
|
||||
|
||||
// Metrics
|
||||
private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name"
|
||||
|
||||
private val durationTimer = metrics.timer(metric("Verification.Duration"))
|
||||
private val successMeter = metrics.meter(metric("Verification.Success"))
|
||||
private val failureMeter = metrics.meter(metric("Verification.Failure"))
|
||||
|
||||
class VerificationResultForUnknownTransaction(nonce: Long) :
|
||||
Exception("Verification result arrived for unknown transaction nonce $nonce")
|
||||
|
||||
fun start(responseConsumer: ClientConsumer) {
|
||||
log.info("Starting out of process verification service")
|
||||
metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size })
|
||||
responseConsumer.setMessageHandler { message ->
|
||||
val response = VerifierApi.VerificationResponse.fromClientMessage(message)
|
||||
val handle = verificationHandles.remove(response.verificationId) ?: throw VerificationResultForUnknownTransaction(response.verificationId)
|
||||
handle.durationTimerContext.stop()
|
||||
val exception = response.exception
|
||||
if (exception == null) {
|
||||
successMeter.mark()
|
||||
handle.resultFuture.set(Unit)
|
||||
} else {
|
||||
failureMeter.mark()
|
||||
handle.resultFuture.setException(exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun verify(transaction: LedgerTransaction): CordaFuture<*> {
|
||||
log.info("Verifying ${transaction.id}")
|
||||
val future = openFuture<Unit>()
|
||||
val nonce = random63BitValue()
|
||||
verificationHandles[nonce] = VerificationHandle(transaction.id, future, durationTimer.time())
|
||||
sendRequest(nonce, transaction)
|
||||
return future
|
||||
}
|
||||
}
|
@ -11,8 +11,7 @@ import net.corda.testing.node.NotarySpec
|
||||
import java.nio.file.Path
|
||||
|
||||
enum class VerifierType {
|
||||
InMemory,
|
||||
OutOfProcess
|
||||
InMemory
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user