diff --git a/.github/workflows/check-pr-title.yml b/.github/workflows/check-pr-title.yml index 6d45a2bd31..f99824a302 100644 --- a/.github/workflows/check-pr-title.yml +++ b/.github/workflows/check-pr-title.yml @@ -9,6 +9,6 @@ jobs: steps: - uses: morrisoncole/pr-lint-action@v1.4.1 with: - title-regex: '^((CORDA|AG|EG|ENT|INFRA|NAAS|ES)-\d+|NOTICK)(.*)' + title-regex: '^((CORDA|AG|EG|ENT|INFRA|ES)-\d+|NOTICK)(.*)' on-failed-regex-comment: "PR title failed to match regex -> `%regex%`" repo-token: "${{ secrets.GITHUB_TOKEN }}" diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index d90befe6ae..d008a351dc 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -1,5 +1,6 @@ package net.corda.client.rpc +import io.netty.util.concurrent.DefaultThreadFactory import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.client.rpc.internal.SerializationEnvironmentHelper @@ -52,7 +53,7 @@ class CordaRPCConnection private constructor( sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null ): CordaRPCConnection { - val observersPool: ExecutorService = Executors.newCachedThreadPool() + val observersPool: ExecutorService = Executors.newCachedThreadPool(DefaultThreadFactory("RPCObserver")) return CordaRPCConnection(null, observersPool, ReconnectingCordaRPCOps( addresses, username, diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 0bbf8acf0f..afaa51fe0b 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -1,5 +1,6 @@ package net.corda.client.rpc.internal +import io.netty.util.concurrent.DefaultThreadFactory import net.corda.client.rpc.ConnectionFailureException import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration @@ -99,7 +100,8 @@ class ReconnectingCordaRPCOps private constructor( ErrorInterceptingHandler(reconnectingRPCConnection)) as CordaRPCOps } } - private val retryFlowsPool = Executors.newScheduledThreadPool(1) + private val retryFlowsPool = Executors.newScheduledThreadPool(1, DefaultThreadFactory("FlowRetry")) + /** * This function runs a flow and retries until it completes successfully. * diff --git a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt index 3b6db58dcd..51d61cc214 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt @@ -1,3 +1,5 @@ +@file:Suppress("LongParameterList") + package net.corda.core.node.services import co.paralleluniverse.fibers.Suspendable @@ -197,8 +199,7 @@ class Vault(val states: Iterable>) { * 4) Status types used in this query: [StateStatus.UNCONSUMED], [StateStatus.CONSUMED], [StateStatus.ALL]. * 5) Other results as a [List] of any type (eg. aggregate function results with/without group by). * - * Note: currently otherResults are used only for Aggregate Functions (in which case, the states and statesMetadata - * results will be empty). + * Note: currently [otherResults] is used only for aggregate functions (in which case, [states] and [statesMetadata] will be empty). */ @CordaSerializable data class Page(val states: List>, @@ -213,11 +214,11 @@ class Vault(val states: Iterable>) { val contractStateClassName: String, val recordedTime: Instant, val consumedTime: Instant?, - val status: Vault.StateStatus, + val status: StateStatus, val notary: AbstractParty?, val lockId: String?, val lockUpdateTime: Instant?, - val relevancyStatus: Vault.RelevancyStatus? = null, + val relevancyStatus: RelevancyStatus? = null, val constraintInfo: ConstraintInfo? = null ) { fun copy( @@ -225,7 +226,7 @@ class Vault(val states: Iterable>) { contractStateClassName: String = this.contractStateClassName, recordedTime: Instant = this.recordedTime, consumedTime: Instant? = this.consumedTime, - status: Vault.StateStatus = this.status, + status: StateStatus = this.status, notary: AbstractParty? = this.notary, lockId: String? = this.lockId, lockUpdateTime: Instant? = this.lockUpdateTime @@ -237,11 +238,11 @@ class Vault(val states: Iterable>) { contractStateClassName: String = this.contractStateClassName, recordedTime: Instant = this.recordedTime, consumedTime: Instant? = this.consumedTime, - status: Vault.StateStatus = this.status, + status: StateStatus = this.status, notary: AbstractParty? = this.notary, lockId: String? = this.lockId, lockUpdateTime: Instant? = this.lockUpdateTime, - relevancyStatus: Vault.RelevancyStatus? + relevancyStatus: RelevancyStatus? ): StateMetadata { return StateMetadata(ref, contractStateClassName, recordedTime, consumedTime, status, notary, lockId, lockUpdateTime, relevancyStatus, ConstraintInfo(AlwaysAcceptAttachmentConstraint)) } @@ -249,9 +250,9 @@ class Vault(val states: Iterable>) { companion object { @Deprecated("No longer used. The vault does not emit empty updates") - val NoUpdate = Update(emptySet(), emptySet(), type = Vault.UpdateType.GENERAL, references = emptySet()) + val NoUpdate = Update(emptySet(), emptySet(), type = UpdateType.GENERAL, references = emptySet()) @Deprecated("No longer used. The vault does not emit empty updates") - val NoNotaryUpdate = Vault.Update(emptySet(), emptySet(), type = Vault.UpdateType.NOTARY_CHANGE, references = emptySet()) + val NoNotaryUpdate = Update(emptySet(), emptySet(), type = UpdateType.NOTARY_CHANGE, references = emptySet()) } } @@ -302,7 +303,7 @@ interface VaultService { fun whenConsumed(ref: StateRef): CordaFuture> { val query = QueryCriteria.VaultQueryCriteria( stateRefs = listOf(ref), - status = Vault.StateStatus.CONSUMED + status = StateStatus.CONSUMED ) val result = trackBy(query) val snapshot = result.snapshot.states @@ -358,8 +359,8 @@ interface VaultService { /** * Helper function to determine spendable states and soft locking them. * Currently performance will be worse than for the hand optimised version in - * [Cash.unconsumedCashStatesForSpending]. However, this is fully generic and can operate with custom [FungibleState] - * and [FungibleAsset] states. + * [net.corda.finance.workflows.asset.selection.AbstractCashSelection.unconsumedCashStatesForSpending]. However, this is fully generic + * and can operate with custom [FungibleState] and [FungibleAsset] states. * @param lockId The [FlowLogic.runId]'s [UUID] of the current flow used to soft lock the states. * @param eligibleStatesQuery A custom query object that selects down to the appropriate subset of all states of the * [contractStateType]. e.g. by selecting on account, issuer, etc. The query is internally augmented with the diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index 7be0ac6229..1c914c35c4 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -42,8 +42,8 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, override fun start(): Started = synchronized(this) { check(started == null) { "start can't be called twice" } val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace) - val backupTransports = backupServerAddressPool.map { - p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace) + val backupTransports = backupServerAddressPool.mapIndexed { index, address -> + p2pConnectorTcpTransport(address, config, threadPoolName = "$threadPoolName-backup${index+1}", trace = trace) } log.info("Connecting to message broker: $serverAddress") diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt index 24813ffaa8..6b5b353b3e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt @@ -122,6 +122,7 @@ class ArtemisTcpTransport { fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true, + threadPoolName: String = "RPCServer", trace: Boolean = false, remotingThreads: Int? = null): TransportConfiguration { val options = mutableMapOf() @@ -129,7 +130,7 @@ class ArtemisTcpTransport { config.keyStorePath.requireOnDefaultFileSystem() options.putAll(config.toTransportOptions()) } - return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, null, enableSSL, "RPCServer", trace, remotingThreads) + return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, null, enableSSL, threadPoolName, trace, remotingThreads) } fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, @@ -147,14 +148,16 @@ class ArtemisTcpTransport { fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, + threadPoolName: String = "Internal-RPCClient", trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() config.addToTransportOptions(options) - return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace, null) + return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, threadPoolName, trace, null) } fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, + threadPoolName: String = "Internal-RPCServer", trace: Boolean = false, remotingThreads: Int? = null): TransportConfiguration { val options = mutableMapOf() @@ -165,7 +168,7 @@ class ArtemisTcpTransport { options, trustManagerFactory(requireNotNull(config.trustStore).get()), true, - "Internal-RPCServer", + threadPoolName, trace, remotingThreads ) @@ -210,7 +213,7 @@ class ArtemisTcpTransport { options[TransportConstants.VERIFY_HOST_PROP_NAME] = false } return createTransport( - NodeNettyConnectorFactory::class.java.name, + CordaNettyConnectorFactory::class.java.name, hostAndPort, protocols, options, diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisUtils.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisUtils.kt index 23bb9d1428..a3c2109d32 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisUtils.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisUtils.kt @@ -1,8 +1,14 @@ @file:JvmName("ArtemisUtils") package net.corda.nodeapi.internal +import net.corda.core.internal.declaredField +import org.apache.activemq.artemis.utils.actors.ProcessorBase import java.nio.file.FileSystems import java.nio.file.Path +import java.util.concurrent.Executor +import java.util.concurrent.ThreadFactory +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicInteger /** * Require that the [Path] is on a default file system, and therefore is one that Artemis is willing to use. @@ -16,3 +22,29 @@ fun requireMessageSize(messageSize: Int, limit: Int) { require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" } } +val Executor.rootExecutor: Executor get() { + var executor: Executor = this + while (executor is ProcessorBase<*>) { + executor = executor.declaredField("delegate").value + } + return executor +} + +fun Executor.setThreadPoolName(threadPoolName: String) { + (rootExecutor as? ThreadPoolExecutor)?.let { it.threadFactory = NamedThreadFactory(threadPoolName, it.threadFactory) } +} + +private class NamedThreadFactory(poolName: String, private val delegate: ThreadFactory) : ThreadFactory { + companion object { + private val poolId = AtomicInteger(0) + } + + private val prefix = "$poolName-${poolId.incrementAndGet()}-" + private val nextId = AtomicInteger(0) + + override fun newThread(r: Runnable): Thread { + val thread = delegate.newThread(r) + thread.name = "$prefix${nextId.incrementAndGet()}" + return thread + } +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/CordaNettyConnectorFactory.kt similarity index 70% rename from node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/CordaNettyConnectorFactory.kt index 47e046566e..a9bdc519a9 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/CordaNettyConnectorFactory.kt @@ -14,15 +14,16 @@ import org.apache.activemq.artemis.utils.ConfigurationHelper import java.util.concurrent.Executor import java.util.concurrent.ScheduledExecutorService -class NodeNettyConnectorFactory : ConnectorFactory { +class CordaNettyConnectorFactory : ConnectorFactory { override fun createConnector(configuration: MutableMap?, handler: BufferHandler?, listener: ClientConnectionLifeCycleListener?, - closeExecutor: Executor?, - threadPool: Executor?, - scheduledThreadPool: ScheduledExecutorService?, + closeExecutor: Executor, + threadPool: Executor, + scheduledThreadPool: ScheduledExecutorService, protocolManager: ClientProtocolManager?): Connector { val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration) + setThreadPoolName(threadPool, closeExecutor, scheduledThreadPool, threadPoolName) val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration) return NettyConnector( configuration, @@ -31,7 +32,7 @@ class NodeNettyConnectorFactory : ConnectorFactory { closeExecutor, threadPool, scheduledThreadPool, - MyClientProtocolManager(threadPoolName, trace) + MyClientProtocolManager("$threadPoolName-netty", trace) ) } @@ -39,6 +40,17 @@ class NodeNettyConnectorFactory : ConnectorFactory { override fun getDefaults(): Map = NettyConnector.DEFAULT_CONFIG + private fun setThreadPoolName(threadPool: Executor, closeExecutor: Executor, scheduledThreadPool: ScheduledExecutorService, name: String) { + threadPool.setThreadPoolName("$name-artemis") + // Artemis will actually wrap the same backing Executor to create multiple "OrderedExecutors". In this scenerio both the threadPool + // and the closeExecutor are the same when it comes to the pool names. If however they are different then given them separate names. + if (threadPool.rootExecutor !== closeExecutor.rootExecutor) { + closeExecutor.setThreadPoolName("$name-artemis-closer") + } + // The scheduler is separate + scheduledThreadPool.setThreadPoolName("$name-artemis-scheduler") + } + private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() { override fun addChannelHandlers(pipeline: ChannelPipeline) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index deb6ef999a..93ab5616de 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -22,6 +22,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig +import net.corda.nodeapi.internal.protonwrapper.netty.sslDelegatedTaskExecutor import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE @@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession import org.slf4j.MDC import rx.Subscription import java.time.Duration +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture @@ -53,7 +55,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore, maxMessageSize: Int, revocationConfig: RevocationConfig, enableSNI: Boolean, - private val artemisMessageClientFactory: () -> ArtemisSessionProvider, + private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider, private val bridgeMetricsService: BridgeMetricsService? = null, trace: Boolean, sslHandshakeTimeout: Duration?, @@ -78,9 +80,11 @@ open class AMQPBridgeManager(keyStore: CertificateStore, private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, revocationConfig,useOpenSSL, enableSNI, trace = trace, _sslHandshakeTimeout = sslHandshakeTimeout) private var sharedEventLoopGroup: EventLoopGroup? = null + private var sslDelegatedTaskExecutor: ExecutorService? = null private var artemis: ArtemisSessionProvider? = null companion object { + private val log = contextLogger() private const val CORDA_NUM_BRIDGE_THREADS_PROP_NAME = "net.corda.nodeapi.amqpbridgemanager.NumBridgeThreads" @@ -97,18 +101,11 @@ open class AMQPBridgeManager(keyStore: CertificateStore, * however Artemis and the remote Corda instanced will deduplicate these messages. */ @Suppress("TooManyFunctions") - private class AMQPBridge(val sourceX500Name: String, - val queueName: String, - val targets: List, - val allowedRemoteLegalNames: Set, - private val amqpConfig: AMQPConfiguration, - sharedEventGroup: EventLoopGroup, - private val artemis: ArtemisSessionProvider, - private val bridgeMetricsService: BridgeMetricsService?, - private val bridgeConnectionTTLSeconds: Int) { - companion object { - private val log = contextLogger() - } + private inner class AMQPBridge(val sourceX500Name: String, + val queueName: String, + val targets: List, + val allowedRemoteLegalNames: Set, + private val amqpConfig: AMQPConfiguration) { private fun withMDC(block: () -> Unit) { val oldMDC = MDC.getCopyOfContextMap() ?: emptyMap() @@ -134,13 +131,18 @@ open class AMQPBridgeManager(keyStore: CertificateStore, private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) } - val amqpClient = AMQPClient(targets, allowedRemoteLegalNames, amqpConfig, sharedThreadPool = sharedEventGroup) + val amqpClient = AMQPClient( + targets, + allowedRemoteLegalNames, + amqpConfig, + AMQPClient.NettyThreading.Shared(sharedEventLoopGroup!!, sslDelegatedTaskExecutor!!) + ) private var session: ClientSession? = null private var consumer: ClientConsumer? = null private var connectedSubscription: Subscription? = null @Volatile private var messagesReceived: Boolean = false - private val eventLoop: EventLoop = sharedEventGroup.next() + private val eventLoop: EventLoop = sharedEventLoopGroup!!.next() private var artemisState: ArtemisState = ArtemisState.STOPPED set(value) { logDebugWithMDC { "State change $field to $value" } @@ -152,32 +154,9 @@ open class AMQPBridgeManager(keyStore: CertificateStore, private var scheduledExecutorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder().setNameFormat("bridge-connection-reset-%d").build()) - @Suppress("ClassNaming") - private sealed class ArtemisState { - object STARTING : ArtemisState() - data class STARTED(override val pending: ScheduledFuture) : ArtemisState() - - object CHECKING : ArtemisState() - object RESTARTED : ArtemisState() - object RECEIVING : ArtemisState() - - object AMQP_STOPPED : ArtemisState() - object AMQP_STARTING : ArtemisState() - object AMQP_STARTED : ArtemisState() - object AMQP_RESTARTED : ArtemisState() - - object STOPPING : ArtemisState() - object STOPPED : ArtemisState() - data class STOPPED_AMQP_START_SCHEDULED(override val pending: ScheduledFuture) : ArtemisState() - - open val pending: ScheduledFuture? = null - - override fun toString(): String = javaClass.simpleName - } - private fun artemis(inProgress: ArtemisState, block: (precedingState: ArtemisState) -> ArtemisState) { val runnable = { - synchronized(artemis) { + synchronized(artemis!!) { try { val precedingState = artemisState artemisState.pending?.cancel(false) @@ -253,7 +232,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore, } } artemis(ArtemisState.STARTING) { - val startedArtemis = artemis.started + val startedArtemis = artemis!!.started if (startedArtemis == null) { logInfoWithMDC("Bridge Connected but Artemis is disconnected") ArtemisState.STOPPED @@ -457,6 +436,29 @@ open class AMQPBridgeManager(keyStore: CertificateStore, } } + @Suppress("ClassNaming") + private sealed class ArtemisState { + object STARTING : ArtemisState() + data class STARTED(override val pending: ScheduledFuture) : ArtemisState() + + object CHECKING : ArtemisState() + object RESTARTED : ArtemisState() + object RECEIVING : ArtemisState() + + object AMQP_STOPPED : ArtemisState() + object AMQP_STARTING : ArtemisState() + object AMQP_STARTED : ArtemisState() + object AMQP_RESTARTED : ArtemisState() + + object STOPPING : ArtemisState() + object STOPPED : ArtemisState() + data class STOPPED_AMQP_START_SCHEDULED(override val pending: ScheduledFuture) : ArtemisState() + + open val pending: ScheduledFuture? = null + + override fun toString(): String = javaClass.simpleName + } + override fun deployBridge(sourceX500Name: String, queueName: String, targets: List, legalNames: Set) { lock.withLock { val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() } @@ -467,8 +469,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore, } val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, revocationConfig, useOpenSsl, enableSNI, sourceX500Name, trace, sslHandshakeTimeout) } - val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!, - bridgeMetricsService, bridgeConnectionTTLSeconds) + val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig) bridges += newBridge bridgeMetricsService?.bridgeCreated(targets, legalNames) newBridge @@ -497,15 +498,16 @@ open class AMQPBridgeManager(keyStore: CertificateStore, // queueNamesToBridgesMap returns a mutable list, .toList converts it to a immutable list so it won't be changed by the [destroyBridge] method. val bridges = queueNamesToBridgesMap[queueName]?.toList() destroyBridge(queueName, bridges?.flatMap { it.targets } ?: emptyList()) - bridges?.map { + bridges?.associate { it.sourceX500Name to BridgeEntry(it.queueName, it.targets, it.allowedRemoteLegalNames.toList(), serviceAddress = false) - }?.toMap() ?: emptyMap() + } ?: emptyMap() } } override fun start() { - sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY)) - val artemis = artemisMessageClientFactory() + sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("NettyBridge", Thread.MAX_PRIORITY)) + sslDelegatedTaskExecutor = sslDelegatedTaskExecutor("NettyBridge") + val artemis = artemisMessageClientFactory("ArtemisBridge") this.artemis = artemis artemis.start() } @@ -522,6 +524,8 @@ open class AMQPBridgeManager(keyStore: CertificateStore, sharedEventLoopGroup = null queueNamesToBridgesMap.clear() artemis?.stop() + sslDelegatedTaskExecutor?.shutdown() + sslDelegatedTaskExecutor = null } } } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index 0fee8f1fba..357088bc0a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -35,7 +35,7 @@ class BridgeControlListener(private val keyStore: CertificateStore, maxMessageSize: Int, revocationConfig: RevocationConfig, enableSNI: Boolean, - private val artemisMessageClientFactory: () -> ArtemisSessionProvider, + private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider, bridgeMetricsService: BridgeMetricsService? = null, trace: Boolean = false, sslHandshakeTimeout: Duration? = null, @@ -80,7 +80,7 @@ class BridgeControlListener(private val keyStore: CertificateStore, bridgeNotifyQueue = "$BRIDGE_NOTIFY.$queueDisambiguityId" bridgeManager.start() - val artemis = artemisMessageClientFactory() + val artemis = artemisMessageClientFactory("BridgeControl") this.artemis = artemis artemis.start() val artemisClient = artemis.started!! diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt index e9ac1ca522..2dd9f8bff0 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt @@ -37,7 +37,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore, maxMessageSize: Int, revocationConfig: RevocationConfig, enableSNI: Boolean, - private val artemisMessageClientFactory: () -> ArtemisSessionProvider, + private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider, private val bridgeMetricsService: BridgeMetricsService? = null, private val isLocalInbox: (String) -> Boolean, trace: Boolean, @@ -204,7 +204,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore, override fun start() { super.start() - val artemis = artemisMessageClientFactory() + val artemis = artemisMessageClientFactory("LoopbackBridge") this.artemis = artemis artemis.start() } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt index 3c18830147..c502817029 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt @@ -32,7 +32,9 @@ import rx.Observable import rx.subjects.PublishSubject import java.lang.Long.min import java.net.InetSocketAddress +import java.util.concurrent.Executor import java.util.concurrent.ExecutorService +import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -61,8 +63,7 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA class AMQPClient(private val targets: List, val allowedRemoteLegalNames: Set, private val configuration: AMQPConfiguration, - private val sharedThreadPool: EventLoopGroup? = null, - private val threadPoolName: String = "AMQPClient", + private val nettyThreading: NettyThreading = NettyThreading.NonShared("AMQPClient"), private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON) : AutoCloseable { companion object { init { @@ -82,7 +83,6 @@ class AMQPClient(private val targets: List, private val lock = ReentrantLock() @Volatile private var started: Boolean = false - private var workerGroup: EventLoopGroup? = null @Volatile private var clientChannel: Channel? = null // Offset into the list of targets, so that we can implement round-robin reconnect logic. @@ -94,7 +94,6 @@ class AMQPClient(private val targets: List, private var amqpActive = false @Volatile private var amqpChannelHandler: ChannelHandler? = null - private var sslDelegatedTaskExecutor: ExecutorService? = null val localAddressString: String get() = clientChannel?.localAddress()?.toString() ?: "" @@ -123,7 +122,7 @@ class AMQPClient(private val targets: List, log.info("Failed to connect to $currentTarget", future.cause()) if (started) { - workerGroup?.schedule({ + nettyThreading.eventLoopGroup.schedule({ nextTarget() restart() }, retryInterval, TimeUnit.MILLISECONDS) @@ -142,7 +141,7 @@ class AMQPClient(private val targets: List, clientChannel = null if (started && !amqpActive) { log.debug { "Scheduling restart of $currentTarget (AMQP inactive)" } - workerGroup?.schedule({ + nettyThreading.eventLoopGroup.schedule({ nextTarget() restart() }, retryInterval, TimeUnit.MILLISECONDS) @@ -198,7 +197,6 @@ class AMQPClient(private val targets: List, val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent.configuration) val target = parent.currentTarget - val delegatedTaskExecutor = checkNotNull(parent.sslDelegatedTaskExecutor) val handler = if (parent.configuration.useOpenSsl) { createClientOpenSslHandler( target, @@ -206,7 +204,7 @@ class AMQPClient(private val targets: List, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc(), - delegatedTaskExecutor + parent.nettyThreading.sslDelegatedTaskExecutor ) } else { createClientSslHandler( @@ -214,7 +212,7 @@ class AMQPClient(private val targets: List, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, - delegatedTaskExecutor + parent.nettyThreading.sslDelegatedTaskExecutor ) } handler.handshakeTimeoutMillis = conf.sslHandshakeTimeout.toMillis() @@ -256,7 +254,7 @@ class AMQPClient(private val targets: List, if (started && amqpActive) { log.debug { "Scheduling restart of $currentTarget (AMQP active)" } - workerGroup?.schedule({ + nettyThreading.eventLoopGroup.schedule({ nextTarget() restart() }, retryInterval, TimeUnit.MILLISECONDS) @@ -273,8 +271,7 @@ class AMQPClient(private val targets: List, return } log.info("Connect to: $currentTarget") - sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName) - workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY)) + (nettyThreading as? NettyThreading.NonShared)?.start() started = true restart() } @@ -286,7 +283,7 @@ class AMQPClient(private val targets: List, } val bootstrap = Bootstrap() // TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux - bootstrap.group(workerGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this)) + bootstrap.group(nettyThreading.eventLoopGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this)) // Delegate DNS Resolution to the proxy side, if we are using proxy. if (configuration.proxyConfig != null) { bootstrap.resolver(NoopAddressResolverGroup.INSTANCE) @@ -300,16 +297,12 @@ class AMQPClient(private val targets: List, lock.withLock { log.info("Stopping connection to: $currentTarget, Local address: $localAddressString") started = false - if (sharedThreadPool == null) { - workerGroup?.shutdownGracefully() - workerGroup?.terminationFuture()?.sync() + if (nettyThreading is NettyThreading.NonShared) { + nettyThreading.stop() } else { clientChannel?.close()?.sync() } clientChannel = null - workerGroup = null - sslDelegatedTaskExecutor?.shutdown() - sslDelegatedTaskExecutor = null log.info("Stopped connection to $currentTarget") } } @@ -350,4 +343,36 @@ class AMQPClient(private val targets: List, private val _onConnection = PublishSubject.create().toSerialized() val onConnection: Observable get() = _onConnection + + + sealed class NettyThreading { + abstract val eventLoopGroup: EventLoopGroup + abstract val sslDelegatedTaskExecutor: Executor + + class Shared(override val eventLoopGroup: EventLoopGroup, + override val sslDelegatedTaskExecutor: ExecutorService = sslDelegatedTaskExecutor("AMQPClient")) : NettyThreading() + + class NonShared(val threadPoolName: String) : NettyThreading() { + private var _eventLoopGroup: NioEventLoopGroup? = null + override val eventLoopGroup: EventLoopGroup get() = checkNotNull(_eventLoopGroup) + + private var _sslDelegatedTaskExecutor: ThreadPoolExecutor? = null + override val sslDelegatedTaskExecutor: ExecutorService get() = checkNotNull(_sslDelegatedTaskExecutor) + + fun start() { + check(_eventLoopGroup == null) + check(_sslDelegatedTaskExecutor == null) + _eventLoopGroup = NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY)) + _sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName) + } + + fun stop() { + eventLoopGroup.shutdownGracefully() + eventLoopGroup.terminationFuture().sync() + sslDelegatedTaskExecutor.shutdown() + _eventLoopGroup = null + _sslDelegatedTaskExecutor = null + } + } + } } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt index b460cc2268..d7649fcfef 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt @@ -301,7 +301,7 @@ abstract class AbstractServerRevocationTest { listOf(NetworkHostAndPort("localhost", targetPort)), setOf(CHARLIE_NAME), amqpConfig, - threadPoolName = legalName.organisation, + nettyThreading = AMQPClient.NettyThreading.NonShared(legalName.organisation), distPointCrlSource = CertDistPointCrlSource(connectTimeout = crlConnectTimeout) ) amqpClients += amqpClient diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index fde04359da..25f6dce6e8 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -509,7 +509,7 @@ class ProtonWrapperTests { listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), amqpConfig, - sharedThreadPool = sharedEventGroup) + nettyThreading = AMQPClient.NettyThreading.Shared(sharedEventGroup)) } private fun createServer(port: Int, diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 9370c64df1..3ff93abef5 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -5,6 +5,7 @@ import com.codahale.metrics.MetricRegistry import com.google.common.collect.MutableClassToInstanceMap import com.google.common.util.concurrent.MoreExecutors import com.zaxxer.hikari.pool.HikariPool +import io.netty.util.concurrent.DefaultThreadFactory import net.corda.common.logging.errorReporting.NodeDatabaseErrors import net.corda.confidential.SwapIdentitiesFlow import net.corda.core.CordaException @@ -333,7 +334,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, private val schedulerService = makeNodeSchedulerService() private val cordappServices = MutableClassToInstanceMap.create() - private val shutdownExecutor = Executors.newSingleThreadExecutor() + private val shutdownExecutor = Executors.newSingleThreadExecutor(DefaultThreadFactory("Shutdown")) protected abstract val transactionVerifierWorkerCount: Int /** @@ -769,7 +770,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } else { 1.days } - val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater")) + val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("NetworkMapPublisher")) executor.submit(object : Runnable { override fun run() { val republishInterval = try { @@ -1076,7 +1077,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, networkParameters: NetworkParameters) protected open fun makeVaultService(keyManagementService: KeyManagementService, - services: ServicesForResolution, + services: NodeServicesForResolution, database: CordaPersistence, cordappLoader: CordappLoader): VaultServiceInternal { return NodeVaultService(platformClock, keyManagementService, services, database, schemaService, cordappLoader.appClassLoader) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 87f27d7415..d83111487c 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -415,12 +415,13 @@ open class Node(configuration: NodeConfiguration, } private fun makeBridgeControlListener(serverAddress: NetworkHostAndPort, networkParameters: NetworkParameters): BridgeControlListener { - val artemisMessagingClientFactory = { + val artemisMessagingClientFactory = { threadPoolName: String -> ArtemisMessagingClient( configuration.p2pSslOptions, serverAddress, networkParameters.maxMessageSize, - failoverCallback = { errorAndTerminate("ArtemisMessagingClient failed. Shutting down.", null) } + failoverCallback = { errorAndTerminate("ArtemisMessagingClient failed. Shutting down.", null) }, + threadPoolName = threadPoolName ) } return BridgeControlListener( @@ -431,7 +432,8 @@ open class Node(configuration: NodeConfiguration, networkParameters.maxMessageSize, configuration.crlCheckSoftFail.toRevocationConfig(), false, - artemisMessagingClientFactory) + artemisMessagingClientFactory + ) } private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? { diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeServicesForResolution.kt b/node/src/main/kotlin/net/corda/node/internal/NodeServicesForResolution.kt new file mode 100644 index 0000000000..5baa528297 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/NodeServicesForResolution.kt @@ -0,0 +1,15 @@ +package net.corda.node.internal + +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionResolutionException +import net.corda.core.node.ServicesForResolution +import java.util.LinkedHashSet + +interface NodeServicesForResolution : ServicesForResolution { + @Throws(TransactionResolutionException::class) + override fun loadStates(stateRefs: Set): Set> = loadStates(stateRefs, LinkedHashSet()) + + fun >> loadStates(input: Iterable, output: C): C +} diff --git a/node/src/main/kotlin/net/corda/node/internal/ServicesForResolutionImpl.kt b/node/src/main/kotlin/net/corda/node/internal/ServicesForResolutionImpl.kt index 06e46992d4..ffb21894c1 100644 --- a/node/src/main/kotlin/net/corda/node/internal/ServicesForResolutionImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/ServicesForResolutionImpl.kt @@ -1,11 +1,18 @@ package net.corda.node.internal -import net.corda.core.contracts.* +import net.corda.core.contracts.Attachment +import net.corda.core.contracts.AttachmentResolutionException +import net.corda.core.contracts.ContractAttachment +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionResolutionException +import net.corda.core.contracts.TransactionState import net.corda.core.cordapp.CordappProvider import net.corda.core.crypto.SecureHash import net.corda.core.internal.SerializedStateAndRef +import net.corda.core.internal.uncheckedCast import net.corda.core.node.NetworkParameters -import net.corda.core.node.ServicesForResolution import net.corda.core.node.services.AttachmentStorage import net.corda.core.node.services.IdentityService import net.corda.core.node.services.NetworkParametersService @@ -23,7 +30,7 @@ data class ServicesForResolutionImpl( override val cordappProvider: CordappProvider, override val networkParametersService: NetworkParametersService, private val validatedTransactions: TransactionStorage -) : ServicesForResolution { +) : NodeServicesForResolution { override val networkParameters: NetworkParameters get() = networkParametersService.lookup(networkParametersService.currentHash) ?: throw IllegalArgumentException("No current parameters in network parameters storage") @@ -32,12 +39,11 @@ data class ServicesForResolutionImpl( return toBaseTransaction(stateRef.txhash).outputs[stateRef.index] } - @Throws(TransactionResolutionException::class) - override fun loadStates(stateRefs: Set): Set> { + override fun >> loadStates(input: Iterable, output: C): C { val baseTxs = HashMap() - return stateRefs.mapTo(LinkedHashSet()) { stateRef -> + return input.mapTo(output) { stateRef -> val baseTx = baseTxs.computeIfAbsent(stateRef.txhash, ::toBaseTransaction) - StateAndRef(baseTx.outputs[stateRef.index], stateRef) + StateAndRef(uncheckedCast(baseTx.outputs[stateRef.index]), stateRef) } } diff --git a/node/src/main/kotlin/net/corda/node/migration/VaultStateMigration.kt b/node/src/main/kotlin/net/corda/node/migration/VaultStateMigration.kt index dad25cf69f..28d8dc3a89 100644 --- a/node/src/main/kotlin/net/corda/node/migration/VaultStateMigration.kt +++ b/node/src/main/kotlin/net/corda/node/migration/VaultStateMigration.kt @@ -2,7 +2,6 @@ package net.corda.node.migration import liquibase.database.Database import net.corda.core.contracts.* -import net.corda.core.crypto.SecureHash import net.corda.core.identity.CordaX500Name import net.corda.core.node.services.Vault import net.corda.core.schemas.MappedSchema @@ -19,6 +18,7 @@ import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSchemaV1 +import net.corda.node.services.vault.toStateRef import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.SchemaMigration @@ -62,8 +62,7 @@ class VaultStateMigration : CordaMigration() { private fun getStateAndRef(persistentState: VaultSchemaV1.VaultStates): StateAndRef { val persistentStateRef = persistentState.stateRef ?: throw VaultStateMigrationException("Persistent state ref missing from state") - val txHash = SecureHash.create(persistentStateRef.txId) - val stateRef = StateRef(txHash, persistentStateRef.index) + val stateRef = persistentStateRef.toStateRef() val state = try { servicesForResolution.loadState(stateRef) } catch (e: Exception) { diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index f13d1d73bf..b1e1ceb1f0 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -2,6 +2,7 @@ package net.corda.node.services.events import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.ListenableFuture +import io.netty.util.concurrent.DefaultThreadFactory import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationOrigin @@ -148,7 +149,7 @@ class NodeSchedulerService(private val clock: CordaClock, // from the database private val startingStateRefs: MutableSet = ConcurrentHashMap.newKeySet() private val mutex = ThreadBox(InnerState()) - private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() + private val schedulerTimerExecutor = Executors.newSingleThreadExecutor(DefaultThreadFactory("SchedulerService")) // if there's nothing to do, check every minute if something fell through the cracks. // any new state should trigger a reschedule immediately if nothing is scheduled, so I would not expect diff --git a/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt b/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt index 2208eef88f..f62db2eee4 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt @@ -2,8 +2,8 @@ package net.corda.node.services.events import net.corda.core.contracts.ScheduledStateRef import net.corda.core.contracts.StateRef -import net.corda.core.crypto.SecureHash import net.corda.core.schemas.PersistentStateRef +import net.corda.node.services.vault.toStateRef import net.corda.nodeapi.internal.persistence.CordaPersistence interface ScheduledFlowRepository { @@ -25,9 +25,8 @@ class PersistentScheduledFlowRepository(val database: CordaPersistence) : Schedu } private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair { - val txId = scheduledStateRecord.output.txId - val index = scheduledStateRecord.output.index - return Pair(StateRef(SecureHash.create(txId), index), ScheduledStateRef(StateRef(SecureHash.create(txId), index), scheduledStateRecord.scheduledAt)) + val stateRef = scheduledStateRecord.output.toStateRef() + return Pair(stateRef, ScheduledStateRef(stateRef, scheduledStateRecord.scheduledAt)) } override fun delete(key: StateRef): Boolean { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index f03242d5e0..c0a6ed0388 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -65,7 +65,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, private val messagingServerAddress: NetworkHostAndPort, private val maxMessageSize: Int, private val journalBufferTimeout : Int? = null, - private val threadPoolName: String = "ArtemisServer", + private val threadPoolName: String = "P2PServer", private val trace: Boolean = false, private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON, private val remotingThreads: Int? = null) : ArtemisBroker, SingletonSerializeAsToken() { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt index ddca20e266..bc37cd747e 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt @@ -11,6 +11,8 @@ import net.corda.core.internal.declaredField import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.protonwrapper.netty.sslDelegatedTaskExecutor +import net.corda.nodeapi.internal.setThreadPoolName +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.core.BaseInterceptor import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor import org.apache.activemq.artemis.core.server.balancing.RedirectHandler @@ -41,10 +43,23 @@ class NodeNettyAcceptorFactory : AcceptorFactory { handler: BufferHandler?, listener: ServerConnectionLifeCycleListener?, threadPool: Executor, - scheduledThreadPool: ScheduledExecutorService?, + scheduledThreadPool: ScheduledExecutorService, protocolMap: MutableMap, RedirectHandler<*>>>?): Acceptor { + val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Acceptor", configuration) + threadPool.setThreadPoolName("$threadPoolName-artemis") + scheduledThreadPool.setThreadPoolName("$threadPoolName-artemis-scheduler") val failureExecutor = OrderedExecutor(threadPool) - return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap) + return NodeNettyAcceptor( + name, + clusterConnection, + configuration, + handler, + listener, + scheduledThreadPool, + failureExecutor, + protocolMap, + "$threadPoolName-netty" + ) } @@ -55,7 +70,8 @@ class NodeNettyAcceptorFactory : AcceptorFactory { listener: ServerConnectionLifeCycleListener?, scheduledThreadPool: ScheduledExecutorService?, failureExecutor: Executor, - protocolMap: MutableMap, RedirectHandler<*>>>?) : + protocolMap: MutableMap, RedirectHandler<*>>>?, + private val threadPoolName: String) : NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap) { companion object { @@ -68,7 +84,6 @@ class NodeNettyAcceptorFactory : AcceptorFactory { } } - private val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "NodeNettyAcceptor", configuration) private val sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName) private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration) diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index fac12a9343..584b050425 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -74,7 +74,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, } private val parametersUpdatesTrack = PublishSubject.create() - private val networkMapPoller = ScheduledThreadPoolExecutor(1, NamedThreadFactory("Network Map Updater Thread")).apply { + private val networkMapPoller = ScheduledThreadPoolExecutor(1, NamedThreadFactory("NetworkMapUpdater")).apply { executeExistingDelayedTasksAfterShutdownPolicy = false } private var newNetworkParameters: Pair? = null @@ -261,9 +261,12 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, //as HTTP GET is mostly IO bound, use more threads than CPU's //maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests val threadsToUseForNetworkMapDownload = min(Runtime.getRuntime().availableProcessors() * 4, 24) - val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(threadsToUseForNetworkMapDownload, NamedThreadFactory("NetworkMapUpdaterNodeInfoDownloadThread")) + val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool( + threadsToUseForNetworkMapDownload, + NamedThreadFactory("NetworkMapUpdaterNodeInfoDownload") + ) //DB insert is single threaded - use a single threaded executor for it. - val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsertThread")) + val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsert")) val hashesToFetch = (allHashesFromNetworkMap - allNodeHashes) val networkMapDownloadStartTime = System.currentTimeMillis() if (hashesToFetch.isNotEmpty()) { diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/InternalRPCMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/rpc/InternalRPCMessagingClient.kt index 8ed025549c..e48cdf16c0 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/InternalRPCMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/InternalRPCMessagingClient.kt @@ -22,8 +22,7 @@ class InternalRPCMessagingClient(val sslConfig: MutualSslConfiguration, val serv private var rpcServer: RPCServer? = null fun init(rpcOps: List, securityManager: RPCSecurityManager, cacheFactory: NamedCacheFactory) = synchronized(this) { - - val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig) + val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig, threadPoolName = "RPCClient") locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt index 498decd2e7..13af138c8e 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt @@ -30,10 +30,10 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, setDirectories(baseDirectory) val acceptorConfigurationsSet = mutableSetOf( - rpcAcceptorTcpTransport(address, sslOptions, enableSSL = useSsl) + rpcAcceptorTcpTransport(address, sslOptions, enableSSL = useSsl, threadPoolName = "RPCServer") ) adminAddress?.let { - acceptorConfigurationsSet += rpcInternalAcceptorTcpTransport(it, nodeConfiguration) + acceptorConfigurationsSet += rpcInternalAcceptorTcpTransport(it, nodeConfiguration, threadPoolName = "RPCServerAdmin") } acceptorConfigurations = acceptorConfigurationsSet diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt index c08515ab0e..734b2b8234 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt @@ -1,5 +1,6 @@ package net.corda.node.services.statemachine +import io.netty.util.concurrent.DefaultThreadFactory import net.corda.core.flows.FlowSession import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowStateMachine @@ -22,10 +23,6 @@ internal class FlowMonitor( ) : LifecycleSupport { private companion object { - private fun defaultScheduler(): ScheduledExecutorService { - return Executors.newSingleThreadScheduledExecutor() - } - private val logger = loggerFor() } @@ -36,7 +33,7 @@ internal class FlowMonitor( override fun start() { synchronized(this) { if (scheduler == null) { - scheduler = defaultScheduler() + scheduler = Executors.newSingleThreadScheduledExecutor(DefaultThreadFactory("FlowMonitor")) shutdownScheduler = true } scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty() }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 66ec2007fa..aa69d50db3 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -25,6 +25,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug +import net.corda.node.services.vault.toStateRef import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX @@ -157,13 +158,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { //TODO null check will become obsolete after making DB/JPA columns not nullable - val txId = it.id.txId - val index = it.id.index - Pair( - StateRef(txhash = SecureHash.create(txId), index = index), - SecureHash.create(it.consumingTxHash) - ) - + Pair(it.id.toStateRef(), SecureHash.create(it.consumingTxHash)) }, toPersistentEntity = { (txHash, index): StateRef, id: SecureHash -> CommittedState( diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 6db962cdce..ac0913604c 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -3,28 +3,65 @@ package net.corda.node.services.vault import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import net.corda.core.CordaRuntimeException -import net.corda.core.contracts.* +import net.corda.core.contracts.Amount +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.FungibleAsset +import net.corda.core.contracts.FungibleState +import net.corda.core.contracts.Issued +import net.corda.core.contracts.OwnableState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.crypto.SecureHash import net.corda.core.crypto.containsAny import net.corda.core.flows.HospitalizeFlowException -import net.corda.core.internal.* +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.TransactionDeserialisationException +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.tee +import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.DataFeed -import net.corda.core.node.ServicesForResolution import net.corda.core.node.StatesToRecord -import net.corda.core.node.services.* -import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo -import net.corda.core.node.services.vault.* +import net.corda.core.node.services.KeyManagementService +import net.corda.core.node.services.StatesNotAvailableException +import net.corda.core.node.services.Vault +import net.corda.core.node.services.VaultQueryException +import net.corda.core.node.services.VaultService +import net.corda.core.node.services.queryBy +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.node.services.vault.Sort +import net.corda.core.node.services.vault.SortAttribute +import net.corda.core.node.services.vault.builder import net.corda.core.observable.internal.OnResilientSubscribe import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.transactions.* -import net.corda.core.utilities.* +import net.corda.core.transactions.ContractUpgradeWireTransaction +import net.corda.core.transactions.CoreTransaction +import net.corda.core.transactions.FullTransaction +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.transactions.NotaryChangeWireTransaction +import net.corda.core.transactions.WireTransaction +import net.corda.core.utilities.NonEmptySet +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.core.utilities.toNonEmptySet +import net.corda.core.utilities.trace +import net.corda.node.internal.NodeServicesForResolution import net.corda.node.services.api.SchemaService import net.corda.node.services.api.VaultServiceInternal import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.statemachine.FlowStateMachineImpl -import net.corda.nodeapi.internal.persistence.* +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit +import net.corda.nodeapi.internal.persistence.contextTransactionOrNull +import net.corda.nodeapi.internal.persistence.currentDBSession +import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import org.hibernate.Session +import org.hibernate.query.Query import rx.Observable import rx.exceptions.OnErrorNotImplementedException import rx.subjects.PublishSubject @@ -32,9 +69,11 @@ import java.security.PublicKey import java.sql.SQLException import java.time.Clock import java.time.Instant -import java.util.* +import java.util.Arrays +import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArraySet +import java.util.stream.Stream import javax.persistence.PersistenceException import javax.persistence.Tuple import javax.persistence.criteria.CriteriaBuilder @@ -54,9 +93,9 @@ import javax.persistence.criteria.Root class NodeVaultService( private val clock: Clock, private val keyManagementService: KeyManagementService, - private val servicesForResolution: ServicesForResolution, + private val servicesForResolution: NodeServicesForResolution, private val database: CordaPersistence, - private val schemaService: SchemaService, + schemaService: SchemaService, private val appClassloader: ClassLoader ) : SingletonSerializeAsToken(), VaultServiceInternal { companion object { @@ -196,7 +235,7 @@ class NodeVaultService( if (lockId != null) { lockId = null lockUpdateTime = clock.instant() - log.trace("Releasing soft lock on consumed state: $stateRef") + log.trace { "Releasing soft lock on consumed state: $stateRef" } } session.save(state) } @@ -227,7 +266,7 @@ class NodeVaultService( } // we are not inside a flow, we are most likely inside a CordaService; // we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates. - return _rawUpdatesPublisher.resilientOnError() + _rawUpdatesPublisher.resilientOnError() } override val updates: Observable> @@ -639,7 +678,23 @@ class NodeVaultService( @Throws(VaultQueryException::class) override fun _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class): Vault.Page { try { - return _queryBy(criteria, paging, sorting, contractStateType, false) + // We decrement by one if the client requests MAX_VALUE, assuming they can not notice this because they don't have enough memory + // to request MAX_VALUE states at once. + val validPaging = if (paging.pageSize == Integer.MAX_VALUE) { + paging.copy(pageSize = Integer.MAX_VALUE - 1) + } else { + checkVaultQuery(paging.pageSize >= 1) { "Page specification: invalid page size ${paging.pageSize} [minimum is 1]" } + paging + } + if (!validPaging.isDefault) { + checkVaultQuery(validPaging.pageNumber >= DEFAULT_PAGE_NUM) { + "Page specification: invalid page number ${validPaging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]" + } + } + log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $validPaging, sorting: $sorting" } + return database.transaction { + queryBy(criteria, validPaging, sorting, contractStateType) + } } catch (e: VaultQueryException) { throw e } catch (e: Exception) { @@ -647,100 +702,90 @@ class NodeVaultService( } } - @Throws(VaultQueryException::class) - private fun _queryBy(criteria: QueryCriteria, paging_: PageSpecification, sorting: Sort, contractStateType: Class, skipPagingChecks: Boolean): Vault.Page { - // We decrement by one if the client requests MAX_PAGE_SIZE, assuming they can not notice this because they don't have enough memory - // to request `MAX_PAGE_SIZE` states at once. - val paging = if (paging_.pageSize == Integer.MAX_VALUE) { - paging_.copy(pageSize = Integer.MAX_VALUE - 1) - } else { - paging_ + private fun queryBy(criteria: QueryCriteria, + paging: PageSpecification, + sorting: Sort, + contractStateType: Class): Vault.Page { + // calculate total results where a page specification has been defined + val totalStatesAvailable = if (paging.isDefault) -1 else queryTotalStateCount(criteria, contractStateType) + + val (query, stateTypes) = createQuery(criteria, contractStateType, sorting) + query.setResultWindow(paging) + + val statesMetadata: MutableList = mutableListOf() + val otherResults: MutableList = mutableListOf() + + query.resultStream(paging).use { results -> + results.forEach { result -> + val result0 = result[0] + if (result0 is VaultSchemaV1.VaultStates) { + statesMetadata.add(result0.toStateMetadata()) + } else { + log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" } + otherResults.addAll(result.toArray().asList()) + } + } } - log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $paging, sorting: $sorting" } - return database.transaction { - // calculate total results where a page specification has been defined - var totalStates = -1L - if (!skipPagingChecks && !paging.isDefault) { - val count = builder { VaultSchemaV1.VaultStates::recordedTime.count() } - val countCriteria = QueryCriteria.VaultCustomQueryCriteria(count, Vault.StateStatus.ALL) - val results = _queryBy(criteria.and(countCriteria), PageSpecification(), Sort(emptyList()), contractStateType, true) // only skip pagination checks for total results count query - totalStates = results.otherResults.last() as Long - } - val session = getSession() + val states: List> = servicesForResolution.loadStates( + statesMetadata.mapTo(LinkedHashSet()) { it.ref }, + ArrayList() + ) - val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java) - val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java) - - // TODO: revisit (use single instance of parser for all queries) - val criteriaParser = HibernateQueryCriteriaParser(contractStateType, contractStateTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates) - - // parse criteria and build where predicates - criteriaParser.parse(criteria, sorting) - - // prepare query for execution - val query = session.createQuery(criteriaQuery) - - // pagination checks - if (!skipPagingChecks && !paging.isDefault) { - // pagination - if (paging.pageNumber < DEFAULT_PAGE_NUM) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]") - if (paging.pageSize < 1) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [minimum is 1]") - if (paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum is $MAX_PAGE_SIZE]") - } - - // For both SQLServer and PostgresSQL, firstResult must be >= 0. So we set a floor at 0. - // TODO: This is a catch-all solution. But why is the default pageNumber set to be -1 in the first place? - // Even if we set the default pageNumber to be 1 instead, that may not cover the non-default cases. - // So the floor may be necessary anyway. - query.firstResult = maxOf(0, (paging.pageNumber - 1) * paging.pageSize) - val pageSize = paging.pageSize + 1 - query.maxResults = if (pageSize > 0) pageSize else Integer.MAX_VALUE // detection too many results, protected against overflow - - // execution - val results = query.resultList + return Vault.Page(states, statesMetadata, totalStatesAvailable, stateTypes, otherResults) + } + private fun Query.resultStream(paging: PageSpecification): Stream { + return if (paging.isDefault) { + val allResults = resultList // final pagination check (fail-fast on too many results when no pagination specified) - if (!skipPagingChecks && paging.isDefault && results.size > DEFAULT_PAGE_SIZE) { - throw VaultQueryException("There are ${results.size} results, which exceeds the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. In order to retrieve these results, provide a `PageSpecification(pageNumber, pageSize)` to the method invoked.") + checkVaultQuery(allResults.size != paging.pageSize + 1) { + "There are more results than the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. " + + "In order to retrieve these results, provide a PageSpecification to the method invoked." } - val statesAndRefs: MutableList> = mutableListOf() - val statesMeta: MutableList = mutableListOf() - val otherResults: MutableList = mutableListOf() - val stateRefs = mutableSetOf() - - results.asSequence() - .forEachIndexed { index, result -> - if (result[0] is VaultSchemaV1.VaultStates) { - if (!paging.isDefault && index == paging.pageSize) // skip last result if paged - return@forEachIndexed - val vaultState = result[0] as VaultSchemaV1.VaultStates - val stateRef = StateRef(SecureHash.create(vaultState.stateRef!!.txId), vaultState.stateRef!!.index) - stateRefs.add(stateRef) - statesMeta.add(Vault.StateMetadata(stateRef, - vaultState.contractStateClassName, - vaultState.recordedTime, - vaultState.consumedTime, - vaultState.stateStatus, - vaultState.notary, - vaultState.lockId, - vaultState.lockUpdateTime, - vaultState.relevancyStatus, - constraintInfo(vaultState.constraintType, vaultState.constraintData) - )) - } else { - // TODO: improve typing of returned other results - log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" } - otherResults.addAll(result.toArray().asList()) - } - } - if (stateRefs.isNotEmpty()) - statesAndRefs.addAll(uncheckedCast(servicesForResolution.loadStates(stateRefs))) - - Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults) + allResults.stream() + } else { + stream() } } + private fun Query<*>.setResultWindow(paging: PageSpecification) { + if (paging.isDefault) { + // For both SQLServer and PostgresSQL, firstResult must be >= 0. + firstResult = 0 + // Peek ahead and see if there are more results in case pagination should be done + maxResults = paging.pageSize + 1 + } else { + firstResult = (paging.pageNumber - 1) * paging.pageSize + maxResults = paging.pageSize + } + } + + private fun queryTotalStateCount(baseCriteria: QueryCriteria, contractStateType: Class): Long { + val count = builder { VaultSchemaV1.VaultStates::recordedTime.count() } + val countCriteria = QueryCriteria.VaultCustomQueryCriteria(count, Vault.StateStatus.ALL) + val criteria = baseCriteria.and(countCriteria) + val (query) = createQuery(criteria, contractStateType, null) + val results = query.resultList + return results.last().toArray().last() as Long + } + + private fun createQuery(criteria: QueryCriteria, + contractStateType: Class, + sorting: Sort?): Pair, Vault.StateStatus> { + val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java) + val criteriaParser = HibernateQueryCriteriaParser( + contractStateType, + contractStateTypeMappings, + criteriaBuilder, + criteriaQuery, + criteriaQuery.from(VaultSchemaV1.VaultStates::class.java) + ) + criteriaParser.parse(criteria, sorting) + val query = getSession().createQuery(criteriaQuery) + return Pair(query, criteriaParser.stateTypes) + } + /** * Returns a [DataFeed] containing the results of the provided query, along with the associated observable, containing any subsequent updates. * @@ -775,6 +820,12 @@ class NodeVaultService( } } + private inline fun checkVaultQuery(value: Boolean, lazyMessage: () -> Any) { + if (!value) { + throw VaultQueryException(lazyMessage().toString()) + } + } + private fun filterContractStates(update: Vault.Update, contractStateType: Class) = update.copy(consumed = filterByContractState(contractStateType, update.consumed), produced = filterByContractState(contractStateType, update.produced)) @@ -802,6 +853,7 @@ class NodeVaultService( } private fun getSession() = database.currentOrNew().session + /** * Derive list from existing vault states and then incrementally update using vault observables */ diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt index 06844d40d0..09c71fe1f7 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt @@ -2,7 +2,9 @@ package net.corda.node.services.vault import net.corda.core.contracts.ContractState import net.corda.core.contracts.MAX_ISSUER_REF_SIZE +import net.corda.core.contracts.StateRef import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.toStringShort import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party @@ -192,3 +194,19 @@ object VaultSchemaV1 : MappedSchema( ) : IndirectStatePersistable } +fun PersistentStateRef.toStateRef(): StateRef = StateRef(SecureHash.create(txId), index) + +fun VaultSchemaV1.VaultStates.toStateMetadata(): Vault.StateMetadata { + return Vault.StateMetadata( + stateRef!!.toStateRef(), + contractStateClassName, + recordedTime, + consumedTime, + stateStatus, + notary, + lockId, + lockUpdateTime, + relevancyStatus, + Vault.ConstraintInfo.constraintInfo(constraintType, constraintData) + ) +} diff --git a/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmartNotaryService.kt b/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmartNotaryService.kt index a570ccd7b5..76094c2a1d 100644 --- a/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmartNotaryService.kt +++ b/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmartNotaryService.kt @@ -21,6 +21,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.PersistentUniquenessProvider +import net.corda.node.services.vault.toStateRef import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import java.security.PublicKey @@ -41,6 +42,8 @@ class BFTSmartNotaryService( ) : NotaryService() { companion object { private val log = contextLogger() + + @Suppress("unused") // Used by NotaryLoader via reflection @JvmStatic val serializationFilter get() = { clazz: Class<*> -> @@ -147,12 +150,7 @@ class BFTSmartNotaryService( toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { //TODO null check will become obsolete after making DB/JPA columns not nullable - val txId = it.id.txId - val index = it.id.index - Pair( - StateRef(txhash = SecureHash.create(txId), index = index), - SecureHash.create(it.consumingTxHash) - ) + Pair(it.id.toStateRef(), SecureHash.create(it.consumingTxHash)) }, toPersistentEntity = { (txHash, index): StateRef, id: SecureHash -> CommittedState( diff --git a/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt b/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt index d38a3f35b7..b678478da6 100644 --- a/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt @@ -24,6 +24,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug +import net.corda.node.services.vault.toStateRef import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.notary.common.InternalResult @@ -142,10 +143,6 @@ class JPAUniquenessProvider( fun encodeStateRef(s: StateRef): PersistentStateRef { return PersistentStateRef(s.txhash.toString(), s.index) } - - fun decodeStateRef(s: PersistentStateRef): StateRef { - return StateRef(txhash = SecureHash.create(s.txId), index = s.index) - } } /** @@ -215,15 +212,15 @@ class JPAUniquenessProvider( committedStates.addAll(existing) } - return committedStates.map { - val stateRef = StateRef(txhash = SecureHash.create(it.id.txId), index = it.id.index) + return committedStates.associate { + val stateRef = it.id.toStateRef() val consumingTxId = SecureHash.create(it.consumingTxHash) if (stateRef in references) { stateRef to StateConsumptionDetails(consumingTxId.reHash(), type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) } else { stateRef to StateConsumptionDetails(consumingTxId.reHash()) } - }.toMap() + } } private fun withRetry(block: () -> T): T { diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt index 1efb349ca0..30cdbe7f59 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt @@ -28,12 +28,14 @@ import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.test.SampleCashSchemaV1 import net.corda.finance.test.SampleCashSchemaV2 import net.corda.finance.test.SampleCashSchemaV3 +import net.corda.node.internal.NodeServicesForResolution import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.schema.ContractStateAndRef import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSchemaV1 +import net.corda.node.services.vault.toStateRef import net.corda.node.testing.DummyFungibleContract import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -48,7 +50,6 @@ import net.corda.testing.internal.vault.VaultFiller import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.assertj.core.api.Assertions -import org.assertj.core.api.Assertions.`in` import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.hibernate.SessionFactory @@ -122,7 +123,14 @@ class HibernateConfigurationTest { services = object : MockServices(cordappPackages, BOB_NAME, mock().also { doReturn(null).whenever(it).verifyAndRegisterIdentity(argThat { name == BOB_NAME }) }, generateKeyPair(), dummyNotary.keyPair) { - override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService, cordappClassloader).apply { start() } + override val vaultService = NodeVaultService( + Clock.systemUTC(), + keyManagementService, + servicesForResolution as NodeServicesForResolution, + database, + schemaService, + cordappClassloader + ).apply { start() } override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable) { for (stx in txs) { (validatedTransactions as WritableTransactionStorage).addTransaction(stx) @@ -183,7 +191,7 @@ class HibernateConfigurationTest { // execute query val queryResults = entityManager.createQuery(criteriaQuery).resultList val coins = queryResults.map { - services.loadState(toStateRef(it.stateRef!!)).data + services.loadState(it.stateRef!!.toStateRef()).data }.sumCash() assertThat(coins.toDecimal() >= BigDecimal("50.00")) } @@ -739,7 +747,7 @@ class HibernateConfigurationTest { val queryResults = entityManager.createQuery(criteriaQuery).resultList queryResults.forEach { - val cashState = services.loadState(toStateRef(it.stateRef!!)).data as Cash.State + val cashState = services.loadState(it.stateRef!!.toStateRef()).data as Cash.State println("${it.stateRef} with owner: ${cashState.owner.owningKey.toBase58String()}") } @@ -823,7 +831,7 @@ class HibernateConfigurationTest { // execute query val queryResults = entityManager.createQuery(criteriaQuery).resultList queryResults.forEach { - val cashState = services.loadState(toStateRef(it.stateRef!!)).data as Cash.State + val cashState = services.loadState(it.stateRef!!.toStateRef()).data as Cash.State println("${it.stateRef} with owner ${cashState.owner.owningKey.toBase58String()} and participants ${cashState.participants.map { it.owningKey.toBase58String() }}") } @@ -961,10 +969,6 @@ class HibernateConfigurationTest { } } - private fun toStateRef(pStateRef: PersistentStateRef): StateRef { - return StateRef(SecureHash.create(pStateRef.txId), pStateRef.index) - } - @Test(timeout=300_000) fun `schema change`() { fun createNewDB(schemas: Set, initialiseSchema: Boolean = true): CordaPersistence { diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt index 1b139ab022..b06518667c 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt @@ -1674,7 +1674,7 @@ abstract class VaultQueryTestsBase : VaultQueryParties { // pagination: last page @Test(timeout=300_000) - fun `all states with paging specification - last`() { + fun `all states with paging specification - last`() { database.transaction { vaultFiller.fillWithSomeTestCash(95.DOLLARS, notaryServices, 95, DUMMY_CASH_ISSUER) // Last page implies we need to perform a row count for the Query first, @@ -1723,7 +1723,7 @@ abstract class VaultQueryTestsBase : VaultQueryParties { @Test(timeout=300_000) fun `pagination not specified but more than default results available`() { expectedEx.expect(VaultQueryException::class.java) - expectedEx.expectMessage("provide a `PageSpecification(pageNumber, pageSize)`") + expectedEx.expectMessage("provide a PageSpecification") database.transaction { vaultFiller.fillWithSomeTestCash(201.DOLLARS, notaryServices, 201, DUMMY_CASH_ISSUER) diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt index 7e771e9904..ac621c9bff 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -10,7 +10,6 @@ import net.corda.core.flows.InitiatingFlow import net.corda.core.identity.AbstractParty import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.uncheckedCast -import net.corda.core.node.ServicesForResolution import net.corda.core.node.services.KeyManagementService import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition @@ -29,6 +28,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.core.singleIdentity import net.corda.testing.flows.registerCoreFlowFactory import net.corda.coretesting.internal.rigorousMock +import net.corda.node.internal.NodeServicesForResolution import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.enclosedCordapp import net.corda.testing.node.internal.startFlow @@ -86,7 +86,7 @@ class VaultSoftLockManagerTest { private val mockNet = InternalMockNetwork(cordappsForAllNodes = listOf(enclosedCordapp()), defaultFactory = { args -> object : InternalMockNetwork.MockNode(args) { override fun makeVaultService(keyManagementService: KeyManagementService, - services: ServicesForResolution, + services: NodeServicesForResolution, database: CordaPersistence, cordappLoader: CordappLoader): VaultServiceInternal { val node = this diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index efd5813736..6dcb0db299 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -26,6 +26,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.VersionInfo import net.corda.node.internal.ServicesForResolutionImpl +import net.corda.node.internal.NodeServicesForResolution import net.corda.node.internal.cordapp.JarScanningCordappLoader import net.corda.node.services.api.* import net.corda.node.services.diagnostics.NodeDiagnosticsService @@ -460,7 +461,14 @@ open class MockServices private constructor( get() = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParametersService, validatedTransactions) internal fun makeVaultService(schemaService: SchemaService, database: CordaPersistence, cordappLoader: CordappLoader): VaultServiceInternal { - return NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService, cordappLoader.appClassLoader).apply { start() } + return NodeVaultService( + clock, + keyManagementService, + servicesForResolution as NodeServicesForResolution, + database, + schemaService, + cordappLoader.appClassLoader + ).apply { start() } } // This needs to be internal as MutableClassToInstanceMap is a guava type and shouldn't be part of our public API