mirror of
https://github.com/corda/corda.git
synced 2025-06-16 06:08:13 +00:00
Merge branch 'os-4.8-merge-point' into shams-4.9-merge-0b8536b9
# Conflicts: # .github/workflows/check-pr-title.yml # node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt
This commit is contained in:
2
.github/workflows/check-pr-title.yml
vendored
2
.github/workflows/check-pr-title.yml
vendored
@ -9,6 +9,6 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
- uses: morrisoncole/pr-lint-action@v1.4.1
|
- uses: morrisoncole/pr-lint-action@v1.4.1
|
||||||
with:
|
with:
|
||||||
title-regex: '^((CORDA|AG|EG|ENT|INFRA|NAAS|ES)-\d+|NOTICK)(.*)'
|
title-regex: '^((CORDA|AG|EG|ENT|INFRA|ES)-\d+|NOTICK)(.*)'
|
||||||
on-failed-regex-comment: "PR title failed to match regex -> `%regex%`"
|
on-failed-regex-comment: "PR title failed to match regex -> `%regex%`"
|
||||||
repo-token: "${{ secrets.GITHUB_TOKEN }}"
|
repo-token: "${{ secrets.GITHUB_TOKEN }}"
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
|
import io.netty.util.concurrent.DefaultThreadFactory
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
|
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
|
||||||
import net.corda.client.rpc.internal.SerializationEnvironmentHelper
|
import net.corda.client.rpc.internal.SerializationEnvironmentHelper
|
||||||
@ -52,7 +53,7 @@ class CordaRPCConnection private constructor(
|
|||||||
sslConfiguration: ClientRpcSslOptions? = null,
|
sslConfiguration: ClientRpcSslOptions? = null,
|
||||||
classLoader: ClassLoader? = null
|
classLoader: ClassLoader? = null
|
||||||
): CordaRPCConnection {
|
): CordaRPCConnection {
|
||||||
val observersPool: ExecutorService = Executors.newCachedThreadPool()
|
val observersPool: ExecutorService = Executors.newCachedThreadPool(DefaultThreadFactory("RPCObserver"))
|
||||||
return CordaRPCConnection(null, observersPool, ReconnectingCordaRPCOps(
|
return CordaRPCConnection(null, observersPool, ReconnectingCordaRPCOps(
|
||||||
addresses,
|
addresses,
|
||||||
username,
|
username,
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.client.rpc.internal
|
package net.corda.client.rpc.internal
|
||||||
|
|
||||||
|
import io.netty.util.concurrent.DefaultThreadFactory
|
||||||
import net.corda.client.rpc.ConnectionFailureException
|
import net.corda.client.rpc.ConnectionFailureException
|
||||||
import net.corda.client.rpc.CordaRPCClient
|
import net.corda.client.rpc.CordaRPCClient
|
||||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||||
@ -99,7 +100,8 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
ErrorInterceptingHandler(reconnectingRPCConnection)) as CordaRPCOps
|
ErrorInterceptingHandler(reconnectingRPCConnection)) as CordaRPCOps
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private val retryFlowsPool = Executors.newScheduledThreadPool(1)
|
private val retryFlowsPool = Executors.newScheduledThreadPool(1, DefaultThreadFactory("FlowRetry"))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function runs a flow and retries until it completes successfully.
|
* This function runs a flow and retries until it completes successfully.
|
||||||
*
|
*
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
@file:Suppress("LongParameterList")
|
||||||
|
|
||||||
package net.corda.core.node.services
|
package net.corda.core.node.services
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
@ -197,8 +199,7 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
|||||||
* 4) Status types used in this query: [StateStatus.UNCONSUMED], [StateStatus.CONSUMED], [StateStatus.ALL].
|
* 4) Status types used in this query: [StateStatus.UNCONSUMED], [StateStatus.CONSUMED], [StateStatus.ALL].
|
||||||
* 5) Other results as a [List] of any type (eg. aggregate function results with/without group by).
|
* 5) Other results as a [List] of any type (eg. aggregate function results with/without group by).
|
||||||
*
|
*
|
||||||
* Note: currently otherResults are used only for Aggregate Functions (in which case, the states and statesMetadata
|
* Note: currently [otherResults] is used only for aggregate functions (in which case, [states] and [statesMetadata] will be empty).
|
||||||
* results will be empty).
|
|
||||||
*/
|
*/
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
data class Page<out T : ContractState>(val states: List<StateAndRef<T>>,
|
data class Page<out T : ContractState>(val states: List<StateAndRef<T>>,
|
||||||
@ -213,11 +214,11 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
|||||||
val contractStateClassName: String,
|
val contractStateClassName: String,
|
||||||
val recordedTime: Instant,
|
val recordedTime: Instant,
|
||||||
val consumedTime: Instant?,
|
val consumedTime: Instant?,
|
||||||
val status: Vault.StateStatus,
|
val status: StateStatus,
|
||||||
val notary: AbstractParty?,
|
val notary: AbstractParty?,
|
||||||
val lockId: String?,
|
val lockId: String?,
|
||||||
val lockUpdateTime: Instant?,
|
val lockUpdateTime: Instant?,
|
||||||
val relevancyStatus: Vault.RelevancyStatus? = null,
|
val relevancyStatus: RelevancyStatus? = null,
|
||||||
val constraintInfo: ConstraintInfo? = null
|
val constraintInfo: ConstraintInfo? = null
|
||||||
) {
|
) {
|
||||||
fun copy(
|
fun copy(
|
||||||
@ -225,7 +226,7 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
|||||||
contractStateClassName: String = this.contractStateClassName,
|
contractStateClassName: String = this.contractStateClassName,
|
||||||
recordedTime: Instant = this.recordedTime,
|
recordedTime: Instant = this.recordedTime,
|
||||||
consumedTime: Instant? = this.consumedTime,
|
consumedTime: Instant? = this.consumedTime,
|
||||||
status: Vault.StateStatus = this.status,
|
status: StateStatus = this.status,
|
||||||
notary: AbstractParty? = this.notary,
|
notary: AbstractParty? = this.notary,
|
||||||
lockId: String? = this.lockId,
|
lockId: String? = this.lockId,
|
||||||
lockUpdateTime: Instant? = this.lockUpdateTime
|
lockUpdateTime: Instant? = this.lockUpdateTime
|
||||||
@ -237,11 +238,11 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
|||||||
contractStateClassName: String = this.contractStateClassName,
|
contractStateClassName: String = this.contractStateClassName,
|
||||||
recordedTime: Instant = this.recordedTime,
|
recordedTime: Instant = this.recordedTime,
|
||||||
consumedTime: Instant? = this.consumedTime,
|
consumedTime: Instant? = this.consumedTime,
|
||||||
status: Vault.StateStatus = this.status,
|
status: StateStatus = this.status,
|
||||||
notary: AbstractParty? = this.notary,
|
notary: AbstractParty? = this.notary,
|
||||||
lockId: String? = this.lockId,
|
lockId: String? = this.lockId,
|
||||||
lockUpdateTime: Instant? = this.lockUpdateTime,
|
lockUpdateTime: Instant? = this.lockUpdateTime,
|
||||||
relevancyStatus: Vault.RelevancyStatus?
|
relevancyStatus: RelevancyStatus?
|
||||||
): StateMetadata {
|
): StateMetadata {
|
||||||
return StateMetadata(ref, contractStateClassName, recordedTime, consumedTime, status, notary, lockId, lockUpdateTime, relevancyStatus, ConstraintInfo(AlwaysAcceptAttachmentConstraint))
|
return StateMetadata(ref, contractStateClassName, recordedTime, consumedTime, status, notary, lockId, lockUpdateTime, relevancyStatus, ConstraintInfo(AlwaysAcceptAttachmentConstraint))
|
||||||
}
|
}
|
||||||
@ -249,9 +250,9 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
|
|||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
@Deprecated("No longer used. The vault does not emit empty updates")
|
@Deprecated("No longer used. The vault does not emit empty updates")
|
||||||
val NoUpdate = Update(emptySet(), emptySet(), type = Vault.UpdateType.GENERAL, references = emptySet())
|
val NoUpdate = Update(emptySet(), emptySet(), type = UpdateType.GENERAL, references = emptySet())
|
||||||
@Deprecated("No longer used. The vault does not emit empty updates")
|
@Deprecated("No longer used. The vault does not emit empty updates")
|
||||||
val NoNotaryUpdate = Vault.Update(emptySet(), emptySet(), type = Vault.UpdateType.NOTARY_CHANGE, references = emptySet())
|
val NoNotaryUpdate = Update(emptySet(), emptySet(), type = UpdateType.NOTARY_CHANGE, references = emptySet())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,7 +303,7 @@ interface VaultService {
|
|||||||
fun whenConsumed(ref: StateRef): CordaFuture<Vault.Update<ContractState>> {
|
fun whenConsumed(ref: StateRef): CordaFuture<Vault.Update<ContractState>> {
|
||||||
val query = QueryCriteria.VaultQueryCriteria(
|
val query = QueryCriteria.VaultQueryCriteria(
|
||||||
stateRefs = listOf(ref),
|
stateRefs = listOf(ref),
|
||||||
status = Vault.StateStatus.CONSUMED
|
status = StateStatus.CONSUMED
|
||||||
)
|
)
|
||||||
val result = trackBy<ContractState>(query)
|
val result = trackBy<ContractState>(query)
|
||||||
val snapshot = result.snapshot.states
|
val snapshot = result.snapshot.states
|
||||||
@ -358,8 +359,8 @@ interface VaultService {
|
|||||||
/**
|
/**
|
||||||
* Helper function to determine spendable states and soft locking them.
|
* Helper function to determine spendable states and soft locking them.
|
||||||
* Currently performance will be worse than for the hand optimised version in
|
* Currently performance will be worse than for the hand optimised version in
|
||||||
* [Cash.unconsumedCashStatesForSpending]. However, this is fully generic and can operate with custom [FungibleState]
|
* [net.corda.finance.workflows.asset.selection.AbstractCashSelection.unconsumedCashStatesForSpending]. However, this is fully generic
|
||||||
* and [FungibleAsset] states.
|
* and can operate with custom [FungibleState] and [FungibleAsset] states.
|
||||||
* @param lockId The [FlowLogic.runId]'s [UUID] of the current flow used to soft lock the states.
|
* @param lockId The [FlowLogic.runId]'s [UUID] of the current flow used to soft lock the states.
|
||||||
* @param eligibleStatesQuery A custom query object that selects down to the appropriate subset of all states of the
|
* @param eligibleStatesQuery A custom query object that selects down to the appropriate subset of all states of the
|
||||||
* [contractStateType]. e.g. by selecting on account, issuer, etc. The query is internally augmented with the
|
* [contractStateType]. e.g. by selecting on account, issuer, etc. The query is internally augmented with the
|
||||||
|
@ -42,8 +42,8 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
|||||||
override fun start(): Started = synchronized(this) {
|
override fun start(): Started = synchronized(this) {
|
||||||
check(started == null) { "start can't be called twice" }
|
check(started == null) { "start can't be called twice" }
|
||||||
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace)
|
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace)
|
||||||
val backupTransports = backupServerAddressPool.map {
|
val backupTransports = backupServerAddressPool.mapIndexed { index, address ->
|
||||||
p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace)
|
p2pConnectorTcpTransport(address, config, threadPoolName = "$threadPoolName-backup${index+1}", trace = trace)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Connecting to message broker: $serverAddress")
|
log.info("Connecting to message broker: $serverAddress")
|
||||||
|
@ -122,6 +122,7 @@ class ArtemisTcpTransport {
|
|||||||
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||||
config: BrokerRpcSslOptions?,
|
config: BrokerRpcSslOptions?,
|
||||||
enableSSL: Boolean = true,
|
enableSSL: Boolean = true,
|
||||||
|
threadPoolName: String = "RPCServer",
|
||||||
trace: Boolean = false,
|
trace: Boolean = false,
|
||||||
remotingThreads: Int? = null): TransportConfiguration {
|
remotingThreads: Int? = null): TransportConfiguration {
|
||||||
val options = mutableMapOf<String, Any>()
|
val options = mutableMapOf<String, Any>()
|
||||||
@ -129,7 +130,7 @@ class ArtemisTcpTransport {
|
|||||||
config.keyStorePath.requireOnDefaultFileSystem()
|
config.keyStorePath.requireOnDefaultFileSystem()
|
||||||
options.putAll(config.toTransportOptions())
|
options.putAll(config.toTransportOptions())
|
||||||
}
|
}
|
||||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, null, enableSSL, "RPCServer", trace, remotingThreads)
|
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, null, enableSSL, threadPoolName, trace, remotingThreads)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
|
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||||
@ -147,14 +148,16 @@ class ArtemisTcpTransport {
|
|||||||
|
|
||||||
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort,
|
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||||
config: SslConfiguration,
|
config: SslConfiguration,
|
||||||
|
threadPoolName: String = "Internal-RPCClient",
|
||||||
trace: Boolean = false): TransportConfiguration {
|
trace: Boolean = false): TransportConfiguration {
|
||||||
val options = mutableMapOf<String, Any>()
|
val options = mutableMapOf<String, Any>()
|
||||||
config.addToTransportOptions(options)
|
config.addToTransportOptions(options)
|
||||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace, null)
|
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, threadPoolName, trace, null)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||||
config: SslConfiguration,
|
config: SslConfiguration,
|
||||||
|
threadPoolName: String = "Internal-RPCServer",
|
||||||
trace: Boolean = false,
|
trace: Boolean = false,
|
||||||
remotingThreads: Int? = null): TransportConfiguration {
|
remotingThreads: Int? = null): TransportConfiguration {
|
||||||
val options = mutableMapOf<String, Any>()
|
val options = mutableMapOf<String, Any>()
|
||||||
@ -165,7 +168,7 @@ class ArtemisTcpTransport {
|
|||||||
options,
|
options,
|
||||||
trustManagerFactory(requireNotNull(config.trustStore).get()),
|
trustManagerFactory(requireNotNull(config.trustStore).get()),
|
||||||
true,
|
true,
|
||||||
"Internal-RPCServer",
|
threadPoolName,
|
||||||
trace,
|
trace,
|
||||||
remotingThreads
|
remotingThreads
|
||||||
)
|
)
|
||||||
@ -210,7 +213,7 @@ class ArtemisTcpTransport {
|
|||||||
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
|
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
|
||||||
}
|
}
|
||||||
return createTransport(
|
return createTransport(
|
||||||
NodeNettyConnectorFactory::class.java.name,
|
CordaNettyConnectorFactory::class.java.name,
|
||||||
hostAndPort,
|
hostAndPort,
|
||||||
protocols,
|
protocols,
|
||||||
options,
|
options,
|
||||||
|
@ -1,8 +1,14 @@
|
|||||||
@file:JvmName("ArtemisUtils")
|
@file:JvmName("ArtemisUtils")
|
||||||
package net.corda.nodeapi.internal
|
package net.corda.nodeapi.internal
|
||||||
|
|
||||||
|
import net.corda.core.internal.declaredField
|
||||||
|
import org.apache.activemq.artemis.utils.actors.ProcessorBase
|
||||||
import java.nio.file.FileSystems
|
import java.nio.file.FileSystems
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
import java.util.concurrent.Executor
|
||||||
|
import java.util.concurrent.ThreadFactory
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Require that the [Path] is on a default file system, and therefore is one that Artemis is willing to use.
|
* Require that the [Path] is on a default file system, and therefore is one that Artemis is willing to use.
|
||||||
@ -16,3 +22,29 @@ fun requireMessageSize(messageSize: Int, limit: Int) {
|
|||||||
require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" }
|
require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val Executor.rootExecutor: Executor get() {
|
||||||
|
var executor: Executor = this
|
||||||
|
while (executor is ProcessorBase<*>) {
|
||||||
|
executor = executor.declaredField<Executor>("delegate").value
|
||||||
|
}
|
||||||
|
return executor
|
||||||
|
}
|
||||||
|
|
||||||
|
fun Executor.setThreadPoolName(threadPoolName: String) {
|
||||||
|
(rootExecutor as? ThreadPoolExecutor)?.let { it.threadFactory = NamedThreadFactory(threadPoolName, it.threadFactory) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private class NamedThreadFactory(poolName: String, private val delegate: ThreadFactory) : ThreadFactory {
|
||||||
|
companion object {
|
||||||
|
private val poolId = AtomicInteger(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val prefix = "$poolName-${poolId.incrementAndGet()}-"
|
||||||
|
private val nextId = AtomicInteger(0)
|
||||||
|
|
||||||
|
override fun newThread(r: Runnable): Thread {
|
||||||
|
val thread = delegate.newThread(r)
|
||||||
|
thread.name = "$prefix${nextId.incrementAndGet()}"
|
||||||
|
return thread
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -14,15 +14,16 @@ import org.apache.activemq.artemis.utils.ConfigurationHelper
|
|||||||
import java.util.concurrent.Executor
|
import java.util.concurrent.Executor
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
|
|
||||||
class NodeNettyConnectorFactory : ConnectorFactory {
|
class CordaNettyConnectorFactory : ConnectorFactory {
|
||||||
override fun createConnector(configuration: MutableMap<String, Any>?,
|
override fun createConnector(configuration: MutableMap<String, Any>?,
|
||||||
handler: BufferHandler?,
|
handler: BufferHandler?,
|
||||||
listener: ClientConnectionLifeCycleListener?,
|
listener: ClientConnectionLifeCycleListener?,
|
||||||
closeExecutor: Executor?,
|
closeExecutor: Executor,
|
||||||
threadPool: Executor?,
|
threadPool: Executor,
|
||||||
scheduledThreadPool: ScheduledExecutorService?,
|
scheduledThreadPool: ScheduledExecutorService,
|
||||||
protocolManager: ClientProtocolManager?): Connector {
|
protocolManager: ClientProtocolManager?): Connector {
|
||||||
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration)
|
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration)
|
||||||
|
setThreadPoolName(threadPool, closeExecutor, scheduledThreadPool, threadPoolName)
|
||||||
val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
||||||
return NettyConnector(
|
return NettyConnector(
|
||||||
configuration,
|
configuration,
|
||||||
@ -31,7 +32,7 @@ class NodeNettyConnectorFactory : ConnectorFactory {
|
|||||||
closeExecutor,
|
closeExecutor,
|
||||||
threadPool,
|
threadPool,
|
||||||
scheduledThreadPool,
|
scheduledThreadPool,
|
||||||
MyClientProtocolManager(threadPoolName, trace)
|
MyClientProtocolManager("$threadPoolName-netty", trace)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,6 +40,17 @@ class NodeNettyConnectorFactory : ConnectorFactory {
|
|||||||
|
|
||||||
override fun getDefaults(): Map<String?, Any?> = NettyConnector.DEFAULT_CONFIG
|
override fun getDefaults(): Map<String?, Any?> = NettyConnector.DEFAULT_CONFIG
|
||||||
|
|
||||||
|
private fun setThreadPoolName(threadPool: Executor, closeExecutor: Executor, scheduledThreadPool: ScheduledExecutorService, name: String) {
|
||||||
|
threadPool.setThreadPoolName("$name-artemis")
|
||||||
|
// Artemis will actually wrap the same backing Executor to create multiple "OrderedExecutors". In this scenerio both the threadPool
|
||||||
|
// and the closeExecutor are the same when it comes to the pool names. If however they are different then given them separate names.
|
||||||
|
if (threadPool.rootExecutor !== closeExecutor.rootExecutor) {
|
||||||
|
closeExecutor.setThreadPoolName("$name-artemis-closer")
|
||||||
|
}
|
||||||
|
// The scheduler is separate
|
||||||
|
scheduledThreadPool.setThreadPoolName("$name-artemis-scheduler")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() {
|
private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() {
|
||||||
override fun addChannelHandlers(pipeline: ChannelPipeline) {
|
override fun addChannelHandlers(pipeline: ChannelPipeline) {
|
@ -22,6 +22,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
|||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig
|
import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
|
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.sslDelegatedTaskExecutor
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||||
@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession
|
|||||||
import org.slf4j.MDC
|
import org.slf4j.MDC
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
import java.util.concurrent.ExecutorService
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
import java.util.concurrent.ScheduledFuture
|
import java.util.concurrent.ScheduledFuture
|
||||||
@ -53,7 +55,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
maxMessageSize: Int,
|
maxMessageSize: Int,
|
||||||
revocationConfig: RevocationConfig,
|
revocationConfig: RevocationConfig,
|
||||||
enableSNI: Boolean,
|
enableSNI: Boolean,
|
||||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||||
private val bridgeMetricsService: BridgeMetricsService? = null,
|
private val bridgeMetricsService: BridgeMetricsService? = null,
|
||||||
trace: Boolean,
|
trace: Boolean,
|
||||||
sslHandshakeTimeout: Duration?,
|
sslHandshakeTimeout: Duration?,
|
||||||
@ -78,9 +80,11 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
|
|
||||||
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, revocationConfig,useOpenSSL, enableSNI, trace = trace, _sslHandshakeTimeout = sslHandshakeTimeout)
|
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, revocationConfig,useOpenSSL, enableSNI, trace = trace, _sslHandshakeTimeout = sslHandshakeTimeout)
|
||||||
private var sharedEventLoopGroup: EventLoopGroup? = null
|
private var sharedEventLoopGroup: EventLoopGroup? = null
|
||||||
|
private var sslDelegatedTaskExecutor: ExecutorService? = null
|
||||||
private var artemis: ArtemisSessionProvider? = null
|
private var artemis: ArtemisSessionProvider? = null
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
private val log = contextLogger()
|
||||||
|
|
||||||
private const val CORDA_NUM_BRIDGE_THREADS_PROP_NAME = "net.corda.nodeapi.amqpbridgemanager.NumBridgeThreads"
|
private const val CORDA_NUM_BRIDGE_THREADS_PROP_NAME = "net.corda.nodeapi.amqpbridgemanager.NumBridgeThreads"
|
||||||
|
|
||||||
@ -97,18 +101,11 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
* however Artemis and the remote Corda instanced will deduplicate these messages.
|
* however Artemis and the remote Corda instanced will deduplicate these messages.
|
||||||
*/
|
*/
|
||||||
@Suppress("TooManyFunctions")
|
@Suppress("TooManyFunctions")
|
||||||
private class AMQPBridge(val sourceX500Name: String,
|
private inner class AMQPBridge(val sourceX500Name: String,
|
||||||
val queueName: String,
|
val queueName: String,
|
||||||
val targets: List<NetworkHostAndPort>,
|
val targets: List<NetworkHostAndPort>,
|
||||||
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
||||||
private val amqpConfig: AMQPConfiguration,
|
private val amqpConfig: AMQPConfiguration) {
|
||||||
sharedEventGroup: EventLoopGroup,
|
|
||||||
private val artemis: ArtemisSessionProvider,
|
|
||||||
private val bridgeMetricsService: BridgeMetricsService?,
|
|
||||||
private val bridgeConnectionTTLSeconds: Int) {
|
|
||||||
companion object {
|
|
||||||
private val log = contextLogger()
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun withMDC(block: () -> Unit) {
|
private fun withMDC(block: () -> Unit) {
|
||||||
val oldMDC = MDC.getCopyOfContextMap() ?: emptyMap<String, String>()
|
val oldMDC = MDC.getCopyOfContextMap() ?: emptyMap<String, String>()
|
||||||
@ -134,13 +131,18 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
|
|
||||||
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||||
|
|
||||||
val amqpClient = AMQPClient(targets, allowedRemoteLegalNames, amqpConfig, sharedThreadPool = sharedEventGroup)
|
val amqpClient = AMQPClient(
|
||||||
|
targets,
|
||||||
|
allowedRemoteLegalNames,
|
||||||
|
amqpConfig,
|
||||||
|
AMQPClient.NettyThreading.Shared(sharedEventLoopGroup!!, sslDelegatedTaskExecutor!!)
|
||||||
|
)
|
||||||
private var session: ClientSession? = null
|
private var session: ClientSession? = null
|
||||||
private var consumer: ClientConsumer? = null
|
private var consumer: ClientConsumer? = null
|
||||||
private var connectedSubscription: Subscription? = null
|
private var connectedSubscription: Subscription? = null
|
||||||
@Volatile
|
@Volatile
|
||||||
private var messagesReceived: Boolean = false
|
private var messagesReceived: Boolean = false
|
||||||
private val eventLoop: EventLoop = sharedEventGroup.next()
|
private val eventLoop: EventLoop = sharedEventLoopGroup!!.next()
|
||||||
private var artemisState: ArtemisState = ArtemisState.STOPPED
|
private var artemisState: ArtemisState = ArtemisState.STOPPED
|
||||||
set(value) {
|
set(value) {
|
||||||
logDebugWithMDC { "State change $field to $value" }
|
logDebugWithMDC { "State change $field to $value" }
|
||||||
@ -152,32 +154,9 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
private var scheduledExecutorService: ScheduledExecutorService
|
private var scheduledExecutorService: ScheduledExecutorService
|
||||||
= Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder().setNameFormat("bridge-connection-reset-%d").build())
|
= Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder().setNameFormat("bridge-connection-reset-%d").build())
|
||||||
|
|
||||||
@Suppress("ClassNaming")
|
|
||||||
private sealed class ArtemisState {
|
|
||||||
object STARTING : ArtemisState()
|
|
||||||
data class STARTED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
|
||||||
|
|
||||||
object CHECKING : ArtemisState()
|
|
||||||
object RESTARTED : ArtemisState()
|
|
||||||
object RECEIVING : ArtemisState()
|
|
||||||
|
|
||||||
object AMQP_STOPPED : ArtemisState()
|
|
||||||
object AMQP_STARTING : ArtemisState()
|
|
||||||
object AMQP_STARTED : ArtemisState()
|
|
||||||
object AMQP_RESTARTED : ArtemisState()
|
|
||||||
|
|
||||||
object STOPPING : ArtemisState()
|
|
||||||
object STOPPED : ArtemisState()
|
|
||||||
data class STOPPED_AMQP_START_SCHEDULED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
|
||||||
|
|
||||||
open val pending: ScheduledFuture<Unit>? = null
|
|
||||||
|
|
||||||
override fun toString(): String = javaClass.simpleName
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun artemis(inProgress: ArtemisState, block: (precedingState: ArtemisState) -> ArtemisState) {
|
private fun artemis(inProgress: ArtemisState, block: (precedingState: ArtemisState) -> ArtemisState) {
|
||||||
val runnable = {
|
val runnable = {
|
||||||
synchronized(artemis) {
|
synchronized(artemis!!) {
|
||||||
try {
|
try {
|
||||||
val precedingState = artemisState
|
val precedingState = artemisState
|
||||||
artemisState.pending?.cancel(false)
|
artemisState.pending?.cancel(false)
|
||||||
@ -253,7 +232,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
artemis(ArtemisState.STARTING) {
|
artemis(ArtemisState.STARTING) {
|
||||||
val startedArtemis = artemis.started
|
val startedArtemis = artemis!!.started
|
||||||
if (startedArtemis == null) {
|
if (startedArtemis == null) {
|
||||||
logInfoWithMDC("Bridge Connected but Artemis is disconnected")
|
logInfoWithMDC("Bridge Connected but Artemis is disconnected")
|
||||||
ArtemisState.STOPPED
|
ArtemisState.STOPPED
|
||||||
@ -457,6 +436,29 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("ClassNaming")
|
||||||
|
private sealed class ArtemisState {
|
||||||
|
object STARTING : ArtemisState()
|
||||||
|
data class STARTED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||||
|
|
||||||
|
object CHECKING : ArtemisState()
|
||||||
|
object RESTARTED : ArtemisState()
|
||||||
|
object RECEIVING : ArtemisState()
|
||||||
|
|
||||||
|
object AMQP_STOPPED : ArtemisState()
|
||||||
|
object AMQP_STARTING : ArtemisState()
|
||||||
|
object AMQP_STARTED : ArtemisState()
|
||||||
|
object AMQP_RESTARTED : ArtemisState()
|
||||||
|
|
||||||
|
object STOPPING : ArtemisState()
|
||||||
|
object STOPPED : ArtemisState()
|
||||||
|
data class STOPPED_AMQP_START_SCHEDULED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||||
|
|
||||||
|
open val pending: ScheduledFuture<Unit>? = null
|
||||||
|
|
||||||
|
override fun toString(): String = javaClass.simpleName
|
||||||
|
}
|
||||||
|
|
||||||
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
|
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
|
||||||
lock.withLock {
|
lock.withLock {
|
||||||
val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() }
|
val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() }
|
||||||
@ -467,8 +469,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
}
|
}
|
||||||
val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize,
|
val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize,
|
||||||
revocationConfig, useOpenSsl, enableSNI, sourceX500Name, trace, sslHandshakeTimeout) }
|
revocationConfig, useOpenSsl, enableSNI, sourceX500Name, trace, sslHandshakeTimeout) }
|
||||||
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!,
|
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig)
|
||||||
bridgeMetricsService, bridgeConnectionTTLSeconds)
|
|
||||||
bridges += newBridge
|
bridges += newBridge
|
||||||
bridgeMetricsService?.bridgeCreated(targets, legalNames)
|
bridgeMetricsService?.bridgeCreated(targets, legalNames)
|
||||||
newBridge
|
newBridge
|
||||||
@ -497,15 +498,16 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
// queueNamesToBridgesMap returns a mutable list, .toList converts it to a immutable list so it won't be changed by the [destroyBridge] method.
|
// queueNamesToBridgesMap returns a mutable list, .toList converts it to a immutable list so it won't be changed by the [destroyBridge] method.
|
||||||
val bridges = queueNamesToBridgesMap[queueName]?.toList()
|
val bridges = queueNamesToBridgesMap[queueName]?.toList()
|
||||||
destroyBridge(queueName, bridges?.flatMap { it.targets } ?: emptyList())
|
destroyBridge(queueName, bridges?.flatMap { it.targets } ?: emptyList())
|
||||||
bridges?.map {
|
bridges?.associate {
|
||||||
it.sourceX500Name to BridgeEntry(it.queueName, it.targets, it.allowedRemoteLegalNames.toList(), serviceAddress = false)
|
it.sourceX500Name to BridgeEntry(it.queueName, it.targets, it.allowedRemoteLegalNames.toList(), serviceAddress = false)
|
||||||
}?.toMap() ?: emptyMap()
|
} ?: emptyMap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun start() {
|
override fun start() {
|
||||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY))
|
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("NettyBridge", Thread.MAX_PRIORITY))
|
||||||
val artemis = artemisMessageClientFactory()
|
sslDelegatedTaskExecutor = sslDelegatedTaskExecutor("NettyBridge")
|
||||||
|
val artemis = artemisMessageClientFactory("ArtemisBridge")
|
||||||
this.artemis = artemis
|
this.artemis = artemis
|
||||||
artemis.start()
|
artemis.start()
|
||||||
}
|
}
|
||||||
@ -522,6 +524,8 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
|||||||
sharedEventLoopGroup = null
|
sharedEventLoopGroup = null
|
||||||
queueNamesToBridgesMap.clear()
|
queueNamesToBridgesMap.clear()
|
||||||
artemis?.stop()
|
artemis?.stop()
|
||||||
|
sslDelegatedTaskExecutor?.shutdown()
|
||||||
|
sslDelegatedTaskExecutor = null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -35,7 +35,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
|||||||
maxMessageSize: Int,
|
maxMessageSize: Int,
|
||||||
revocationConfig: RevocationConfig,
|
revocationConfig: RevocationConfig,
|
||||||
enableSNI: Boolean,
|
enableSNI: Boolean,
|
||||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||||
bridgeMetricsService: BridgeMetricsService? = null,
|
bridgeMetricsService: BridgeMetricsService? = null,
|
||||||
trace: Boolean = false,
|
trace: Boolean = false,
|
||||||
sslHandshakeTimeout: Duration? = null,
|
sslHandshakeTimeout: Duration? = null,
|
||||||
@ -80,7 +80,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
|||||||
bridgeNotifyQueue = "$BRIDGE_NOTIFY.$queueDisambiguityId"
|
bridgeNotifyQueue = "$BRIDGE_NOTIFY.$queueDisambiguityId"
|
||||||
|
|
||||||
bridgeManager.start()
|
bridgeManager.start()
|
||||||
val artemis = artemisMessageClientFactory()
|
val artemis = artemisMessageClientFactory("BridgeControl")
|
||||||
this.artemis = artemis
|
this.artemis = artemis
|
||||||
artemis.start()
|
artemis.start()
|
||||||
val artemisClient = artemis.started!!
|
val artemisClient = artemis.started!!
|
||||||
|
@ -37,7 +37,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
|
|||||||
maxMessageSize: Int,
|
maxMessageSize: Int,
|
||||||
revocationConfig: RevocationConfig,
|
revocationConfig: RevocationConfig,
|
||||||
enableSNI: Boolean,
|
enableSNI: Boolean,
|
||||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||||
private val bridgeMetricsService: BridgeMetricsService? = null,
|
private val bridgeMetricsService: BridgeMetricsService? = null,
|
||||||
private val isLocalInbox: (String) -> Boolean,
|
private val isLocalInbox: (String) -> Boolean,
|
||||||
trace: Boolean,
|
trace: Boolean,
|
||||||
@ -204,7 +204,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
|
|||||||
|
|
||||||
override fun start() {
|
override fun start() {
|
||||||
super.start()
|
super.start()
|
||||||
val artemis = artemisMessageClientFactory()
|
val artemis = artemisMessageClientFactory("LoopbackBridge")
|
||||||
this.artemis = artemis
|
this.artemis = artemis
|
||||||
artemis.start()
|
artemis.start()
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,9 @@ import rx.Observable
|
|||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.lang.Long.min
|
import java.lang.Long.min
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
|
import java.util.concurrent.Executor
|
||||||
import java.util.concurrent.ExecutorService
|
import java.util.concurrent.ExecutorService
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
import kotlin.concurrent.withLock
|
import kotlin.concurrent.withLock
|
||||||
@ -61,8 +63,7 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA
|
|||||||
class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||||
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
||||||
private val configuration: AMQPConfiguration,
|
private val configuration: AMQPConfiguration,
|
||||||
private val sharedThreadPool: EventLoopGroup? = null,
|
private val nettyThreading: NettyThreading = NettyThreading.NonShared("AMQPClient"),
|
||||||
private val threadPoolName: String = "AMQPClient",
|
|
||||||
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON) : AutoCloseable {
|
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON) : AutoCloseable {
|
||||||
companion object {
|
companion object {
|
||||||
init {
|
init {
|
||||||
@ -82,7 +83,6 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
private val lock = ReentrantLock()
|
private val lock = ReentrantLock()
|
||||||
@Volatile
|
@Volatile
|
||||||
private var started: Boolean = false
|
private var started: Boolean = false
|
||||||
private var workerGroup: EventLoopGroup? = null
|
|
||||||
@Volatile
|
@Volatile
|
||||||
private var clientChannel: Channel? = null
|
private var clientChannel: Channel? = null
|
||||||
// Offset into the list of targets, so that we can implement round-robin reconnect logic.
|
// Offset into the list of targets, so that we can implement round-robin reconnect logic.
|
||||||
@ -94,7 +94,6 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
private var amqpActive = false
|
private var amqpActive = false
|
||||||
@Volatile
|
@Volatile
|
||||||
private var amqpChannelHandler: ChannelHandler? = null
|
private var amqpChannelHandler: ChannelHandler? = null
|
||||||
private var sslDelegatedTaskExecutor: ExecutorService? = null
|
|
||||||
|
|
||||||
val localAddressString: String
|
val localAddressString: String
|
||||||
get() = clientChannel?.localAddress()?.toString() ?: "<unknownLocalAddress>"
|
get() = clientChannel?.localAddress()?.toString() ?: "<unknownLocalAddress>"
|
||||||
@ -123,7 +122,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
log.info("Failed to connect to $currentTarget", future.cause())
|
log.info("Failed to connect to $currentTarget", future.cause())
|
||||||
|
|
||||||
if (started) {
|
if (started) {
|
||||||
workerGroup?.schedule({
|
nettyThreading.eventLoopGroup.schedule({
|
||||||
nextTarget()
|
nextTarget()
|
||||||
restart()
|
restart()
|
||||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||||
@ -142,7 +141,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
clientChannel = null
|
clientChannel = null
|
||||||
if (started && !amqpActive) {
|
if (started && !amqpActive) {
|
||||||
log.debug { "Scheduling restart of $currentTarget (AMQP inactive)" }
|
log.debug { "Scheduling restart of $currentTarget (AMQP inactive)" }
|
||||||
workerGroup?.schedule({
|
nettyThreading.eventLoopGroup.schedule({
|
||||||
nextTarget()
|
nextTarget()
|
||||||
restart()
|
restart()
|
||||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||||
@ -198,7 +197,6 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
|
|
||||||
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent.configuration)
|
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent.configuration)
|
||||||
val target = parent.currentTarget
|
val target = parent.currentTarget
|
||||||
val delegatedTaskExecutor = checkNotNull(parent.sslDelegatedTaskExecutor)
|
|
||||||
val handler = if (parent.configuration.useOpenSsl) {
|
val handler = if (parent.configuration.useOpenSsl) {
|
||||||
createClientOpenSslHandler(
|
createClientOpenSslHandler(
|
||||||
target,
|
target,
|
||||||
@ -206,7 +204,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
wrappedKeyManagerFactory,
|
wrappedKeyManagerFactory,
|
||||||
trustManagerFactory,
|
trustManagerFactory,
|
||||||
ch.alloc(),
|
ch.alloc(),
|
||||||
delegatedTaskExecutor
|
parent.nettyThreading.sslDelegatedTaskExecutor
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
createClientSslHandler(
|
createClientSslHandler(
|
||||||
@ -214,7 +212,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
parent.allowedRemoteLegalNames,
|
parent.allowedRemoteLegalNames,
|
||||||
wrappedKeyManagerFactory,
|
wrappedKeyManagerFactory,
|
||||||
trustManagerFactory,
|
trustManagerFactory,
|
||||||
delegatedTaskExecutor
|
parent.nettyThreading.sslDelegatedTaskExecutor
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
handler.handshakeTimeoutMillis = conf.sslHandshakeTimeout.toMillis()
|
handler.handshakeTimeoutMillis = conf.sslHandshakeTimeout.toMillis()
|
||||||
@ -256,7 +254,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
|
|
||||||
if (started && amqpActive) {
|
if (started && amqpActive) {
|
||||||
log.debug { "Scheduling restart of $currentTarget (AMQP active)" }
|
log.debug { "Scheduling restart of $currentTarget (AMQP active)" }
|
||||||
workerGroup?.schedule({
|
nettyThreading.eventLoopGroup.schedule({
|
||||||
nextTarget()
|
nextTarget()
|
||||||
restart()
|
restart()
|
||||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||||
@ -273,8 +271,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.info("Connect to: $currentTarget")
|
log.info("Connect to: $currentTarget")
|
||||||
sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName)
|
(nettyThreading as? NettyThreading.NonShared)?.start()
|
||||||
workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY))
|
|
||||||
started = true
|
started = true
|
||||||
restart()
|
restart()
|
||||||
}
|
}
|
||||||
@ -286,7 +283,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
}
|
}
|
||||||
val bootstrap = Bootstrap()
|
val bootstrap = Bootstrap()
|
||||||
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
|
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
|
||||||
bootstrap.group(workerGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this))
|
bootstrap.group(nettyThreading.eventLoopGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this))
|
||||||
// Delegate DNS Resolution to the proxy side, if we are using proxy.
|
// Delegate DNS Resolution to the proxy side, if we are using proxy.
|
||||||
if (configuration.proxyConfig != null) {
|
if (configuration.proxyConfig != null) {
|
||||||
bootstrap.resolver(NoopAddressResolverGroup.INSTANCE)
|
bootstrap.resolver(NoopAddressResolverGroup.INSTANCE)
|
||||||
@ -300,16 +297,12 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
lock.withLock {
|
lock.withLock {
|
||||||
log.info("Stopping connection to: $currentTarget, Local address: $localAddressString")
|
log.info("Stopping connection to: $currentTarget, Local address: $localAddressString")
|
||||||
started = false
|
started = false
|
||||||
if (sharedThreadPool == null) {
|
if (nettyThreading is NettyThreading.NonShared) {
|
||||||
workerGroup?.shutdownGracefully()
|
nettyThreading.stop()
|
||||||
workerGroup?.terminationFuture()?.sync()
|
|
||||||
} else {
|
} else {
|
||||||
clientChannel?.close()?.sync()
|
clientChannel?.close()?.sync()
|
||||||
}
|
}
|
||||||
clientChannel = null
|
clientChannel = null
|
||||||
workerGroup = null
|
|
||||||
sslDelegatedTaskExecutor?.shutdown()
|
|
||||||
sslDelegatedTaskExecutor = null
|
|
||||||
log.info("Stopped connection to $currentTarget")
|
log.info("Stopped connection to $currentTarget")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -350,4 +343,36 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
|||||||
private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized()
|
private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized()
|
||||||
val onConnection: Observable<ConnectionChange>
|
val onConnection: Observable<ConnectionChange>
|
||||||
get() = _onConnection
|
get() = _onConnection
|
||||||
|
|
||||||
|
|
||||||
|
sealed class NettyThreading {
|
||||||
|
abstract val eventLoopGroup: EventLoopGroup
|
||||||
|
abstract val sslDelegatedTaskExecutor: Executor
|
||||||
|
|
||||||
|
class Shared(override val eventLoopGroup: EventLoopGroup,
|
||||||
|
override val sslDelegatedTaskExecutor: ExecutorService = sslDelegatedTaskExecutor("AMQPClient")) : NettyThreading()
|
||||||
|
|
||||||
|
class NonShared(val threadPoolName: String) : NettyThreading() {
|
||||||
|
private var _eventLoopGroup: NioEventLoopGroup? = null
|
||||||
|
override val eventLoopGroup: EventLoopGroup get() = checkNotNull(_eventLoopGroup)
|
||||||
|
|
||||||
|
private var _sslDelegatedTaskExecutor: ThreadPoolExecutor? = null
|
||||||
|
override val sslDelegatedTaskExecutor: ExecutorService get() = checkNotNull(_sslDelegatedTaskExecutor)
|
||||||
|
|
||||||
|
fun start() {
|
||||||
|
check(_eventLoopGroup == null)
|
||||||
|
check(_sslDelegatedTaskExecutor == null)
|
||||||
|
_eventLoopGroup = NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY))
|
||||||
|
_sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun stop() {
|
||||||
|
eventLoopGroup.shutdownGracefully()
|
||||||
|
eventLoopGroup.terminationFuture().sync()
|
||||||
|
sslDelegatedTaskExecutor.shutdown()
|
||||||
|
_eventLoopGroup = null
|
||||||
|
_sslDelegatedTaskExecutor = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,7 +301,7 @@ abstract class AbstractServerRevocationTest {
|
|||||||
listOf(NetworkHostAndPort("localhost", targetPort)),
|
listOf(NetworkHostAndPort("localhost", targetPort)),
|
||||||
setOf(CHARLIE_NAME),
|
setOf(CHARLIE_NAME),
|
||||||
amqpConfig,
|
amqpConfig,
|
||||||
threadPoolName = legalName.organisation,
|
nettyThreading = AMQPClient.NettyThreading.NonShared(legalName.organisation),
|
||||||
distPointCrlSource = CertDistPointCrlSource(connectTimeout = crlConnectTimeout)
|
distPointCrlSource = CertDistPointCrlSource(connectTimeout = crlConnectTimeout)
|
||||||
)
|
)
|
||||||
amqpClients += amqpClient
|
amqpClients += amqpClient
|
||||||
|
@ -509,7 +509,7 @@ class ProtonWrapperTests {
|
|||||||
listOf(NetworkHostAndPort("localhost", serverPort)),
|
listOf(NetworkHostAndPort("localhost", serverPort)),
|
||||||
setOf(ALICE_NAME),
|
setOf(ALICE_NAME),
|
||||||
amqpConfig,
|
amqpConfig,
|
||||||
sharedThreadPool = sharedEventGroup)
|
nettyThreading = AMQPClient.NettyThreading.Shared(sharedEventGroup))
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createServer(port: Int,
|
private fun createServer(port: Int,
|
||||||
|
@ -5,6 +5,7 @@ import com.codahale.metrics.MetricRegistry
|
|||||||
import com.google.common.collect.MutableClassToInstanceMap
|
import com.google.common.collect.MutableClassToInstanceMap
|
||||||
import com.google.common.util.concurrent.MoreExecutors
|
import com.google.common.util.concurrent.MoreExecutors
|
||||||
import com.zaxxer.hikari.pool.HikariPool
|
import com.zaxxer.hikari.pool.HikariPool
|
||||||
|
import io.netty.util.concurrent.DefaultThreadFactory
|
||||||
import net.corda.common.logging.errorReporting.NodeDatabaseErrors
|
import net.corda.common.logging.errorReporting.NodeDatabaseErrors
|
||||||
import net.corda.confidential.SwapIdentitiesFlow
|
import net.corda.confidential.SwapIdentitiesFlow
|
||||||
import net.corda.core.CordaException
|
import net.corda.core.CordaException
|
||||||
@ -333,7 +334,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
|||||||
private val schedulerService = makeNodeSchedulerService()
|
private val schedulerService = makeNodeSchedulerService()
|
||||||
|
|
||||||
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
|
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
|
||||||
private val shutdownExecutor = Executors.newSingleThreadExecutor()
|
private val shutdownExecutor = Executors.newSingleThreadExecutor(DefaultThreadFactory("Shutdown"))
|
||||||
|
|
||||||
protected abstract val transactionVerifierWorkerCount: Int
|
protected abstract val transactionVerifierWorkerCount: Int
|
||||||
/**
|
/**
|
||||||
@ -769,7 +770,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
|||||||
} else {
|
} else {
|
||||||
1.days
|
1.days
|
||||||
}
|
}
|
||||||
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater"))
|
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("NetworkMapPublisher"))
|
||||||
executor.submit(object : Runnable {
|
executor.submit(object : Runnable {
|
||||||
override fun run() {
|
override fun run() {
|
||||||
val republishInterval = try {
|
val republishInterval = try {
|
||||||
@ -1076,7 +1077,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
|||||||
networkParameters: NetworkParameters)
|
networkParameters: NetworkParameters)
|
||||||
|
|
||||||
protected open fun makeVaultService(keyManagementService: KeyManagementService,
|
protected open fun makeVaultService(keyManagementService: KeyManagementService,
|
||||||
services: ServicesForResolution,
|
services: NodeServicesForResolution,
|
||||||
database: CordaPersistence,
|
database: CordaPersistence,
|
||||||
cordappLoader: CordappLoader): VaultServiceInternal {
|
cordappLoader: CordappLoader): VaultServiceInternal {
|
||||||
return NodeVaultService(platformClock, keyManagementService, services, database, schemaService, cordappLoader.appClassLoader)
|
return NodeVaultService(platformClock, keyManagementService, services, database, schemaService, cordappLoader.appClassLoader)
|
||||||
|
@ -415,12 +415,13 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun makeBridgeControlListener(serverAddress: NetworkHostAndPort, networkParameters: NetworkParameters): BridgeControlListener {
|
private fun makeBridgeControlListener(serverAddress: NetworkHostAndPort, networkParameters: NetworkParameters): BridgeControlListener {
|
||||||
val artemisMessagingClientFactory = {
|
val artemisMessagingClientFactory = { threadPoolName: String ->
|
||||||
ArtemisMessagingClient(
|
ArtemisMessagingClient(
|
||||||
configuration.p2pSslOptions,
|
configuration.p2pSslOptions,
|
||||||
serverAddress,
|
serverAddress,
|
||||||
networkParameters.maxMessageSize,
|
networkParameters.maxMessageSize,
|
||||||
failoverCallback = { errorAndTerminate("ArtemisMessagingClient failed. Shutting down.", null) }
|
failoverCallback = { errorAndTerminate("ArtemisMessagingClient failed. Shutting down.", null) },
|
||||||
|
threadPoolName = threadPoolName
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
return BridgeControlListener(
|
return BridgeControlListener(
|
||||||
@ -431,7 +432,8 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
networkParameters.maxMessageSize,
|
networkParameters.maxMessageSize,
|
||||||
configuration.crlCheckSoftFail.toRevocationConfig(),
|
configuration.crlCheckSoftFail.toRevocationConfig(),
|
||||||
false,
|
false,
|
||||||
artemisMessagingClientFactory)
|
artemisMessagingClientFactory
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? {
|
private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? {
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
package net.corda.node.internal
|
||||||
|
|
||||||
|
import net.corda.core.contracts.ContractState
|
||||||
|
import net.corda.core.contracts.StateAndRef
|
||||||
|
import net.corda.core.contracts.StateRef
|
||||||
|
import net.corda.core.contracts.TransactionResolutionException
|
||||||
|
import net.corda.core.node.ServicesForResolution
|
||||||
|
import java.util.LinkedHashSet
|
||||||
|
|
||||||
|
interface NodeServicesForResolution : ServicesForResolution {
|
||||||
|
@Throws(TransactionResolutionException::class)
|
||||||
|
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> = loadStates(stateRefs, LinkedHashSet())
|
||||||
|
|
||||||
|
fun <T : ContractState, C : MutableCollection<StateAndRef<T>>> loadStates(input: Iterable<StateRef>, output: C): C
|
||||||
|
}
|
@ -1,11 +1,18 @@
|
|||||||
package net.corda.node.internal
|
package net.corda.node.internal
|
||||||
|
|
||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.Attachment
|
||||||
|
import net.corda.core.contracts.AttachmentResolutionException
|
||||||
|
import net.corda.core.contracts.ContractAttachment
|
||||||
|
import net.corda.core.contracts.ContractState
|
||||||
|
import net.corda.core.contracts.StateAndRef
|
||||||
|
import net.corda.core.contracts.StateRef
|
||||||
|
import net.corda.core.contracts.TransactionResolutionException
|
||||||
|
import net.corda.core.contracts.TransactionState
|
||||||
import net.corda.core.cordapp.CordappProvider
|
import net.corda.core.cordapp.CordappProvider
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.internal.SerializedStateAndRef
|
import net.corda.core.internal.SerializedStateAndRef
|
||||||
|
import net.corda.core.internal.uncheckedCast
|
||||||
import net.corda.core.node.NetworkParameters
|
import net.corda.core.node.NetworkParameters
|
||||||
import net.corda.core.node.ServicesForResolution
|
|
||||||
import net.corda.core.node.services.AttachmentStorage
|
import net.corda.core.node.services.AttachmentStorage
|
||||||
import net.corda.core.node.services.IdentityService
|
import net.corda.core.node.services.IdentityService
|
||||||
import net.corda.core.node.services.NetworkParametersService
|
import net.corda.core.node.services.NetworkParametersService
|
||||||
@ -23,7 +30,7 @@ data class ServicesForResolutionImpl(
|
|||||||
override val cordappProvider: CordappProvider,
|
override val cordappProvider: CordappProvider,
|
||||||
override val networkParametersService: NetworkParametersService,
|
override val networkParametersService: NetworkParametersService,
|
||||||
private val validatedTransactions: TransactionStorage
|
private val validatedTransactions: TransactionStorage
|
||||||
) : ServicesForResolution {
|
) : NodeServicesForResolution {
|
||||||
override val networkParameters: NetworkParameters get() = networkParametersService.lookup(networkParametersService.currentHash) ?:
|
override val networkParameters: NetworkParameters get() = networkParametersService.lookup(networkParametersService.currentHash) ?:
|
||||||
throw IllegalArgumentException("No current parameters in network parameters storage")
|
throw IllegalArgumentException("No current parameters in network parameters storage")
|
||||||
|
|
||||||
@ -32,12 +39,11 @@ data class ServicesForResolutionImpl(
|
|||||||
return toBaseTransaction(stateRef.txhash).outputs[stateRef.index]
|
return toBaseTransaction(stateRef.txhash).outputs[stateRef.index]
|
||||||
}
|
}
|
||||||
|
|
||||||
@Throws(TransactionResolutionException::class)
|
override fun <T : ContractState, C : MutableCollection<StateAndRef<T>>> loadStates(input: Iterable<StateRef>, output: C): C {
|
||||||
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
|
|
||||||
val baseTxs = HashMap<SecureHash, BaseTransaction>()
|
val baseTxs = HashMap<SecureHash, BaseTransaction>()
|
||||||
return stateRefs.mapTo(LinkedHashSet()) { stateRef ->
|
return input.mapTo(output) { stateRef ->
|
||||||
val baseTx = baseTxs.computeIfAbsent(stateRef.txhash, ::toBaseTransaction)
|
val baseTx = baseTxs.computeIfAbsent(stateRef.txhash, ::toBaseTransaction)
|
||||||
StateAndRef(baseTx.outputs[stateRef.index], stateRef)
|
StateAndRef(uncheckedCast(baseTx.outputs[stateRef.index]), stateRef)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,7 +2,6 @@ package net.corda.node.migration
|
|||||||
|
|
||||||
import liquibase.database.Database
|
import liquibase.database.Database
|
||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.crypto.SecureHash
|
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.node.services.Vault
|
import net.corda.core.node.services.Vault
|
||||||
import net.corda.core.schemas.MappedSchema
|
import net.corda.core.schemas.MappedSchema
|
||||||
@ -19,6 +18,7 @@ import net.corda.node.services.persistence.DBTransactionStorage
|
|||||||
import net.corda.node.services.persistence.NodeAttachmentService
|
import net.corda.node.services.persistence.NodeAttachmentService
|
||||||
import net.corda.node.services.vault.NodeVaultService
|
import net.corda.node.services.vault.NodeVaultService
|
||||||
import net.corda.node.services.vault.VaultSchemaV1
|
import net.corda.node.services.vault.VaultSchemaV1
|
||||||
|
import net.corda.node.services.vault.toStateRef
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||||
@ -62,8 +62,7 @@ class VaultStateMigration : CordaMigration() {
|
|||||||
private fun getStateAndRef(persistentState: VaultSchemaV1.VaultStates): StateAndRef<ContractState> {
|
private fun getStateAndRef(persistentState: VaultSchemaV1.VaultStates): StateAndRef<ContractState> {
|
||||||
val persistentStateRef = persistentState.stateRef ?:
|
val persistentStateRef = persistentState.stateRef ?:
|
||||||
throw VaultStateMigrationException("Persistent state ref missing from state")
|
throw VaultStateMigrationException("Persistent state ref missing from state")
|
||||||
val txHash = SecureHash.create(persistentStateRef.txId)
|
val stateRef = persistentStateRef.toStateRef()
|
||||||
val stateRef = StateRef(txHash, persistentStateRef.index)
|
|
||||||
val state = try {
|
val state = try {
|
||||||
servicesForResolution.loadState(stateRef)
|
servicesForResolution.loadState(stateRef)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
@ -2,6 +2,7 @@ package net.corda.node.services.events
|
|||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
|
import io.netty.util.concurrent.DefaultThreadFactory
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.context.InvocationContext
|
import net.corda.core.context.InvocationContext
|
||||||
import net.corda.core.context.InvocationOrigin
|
import net.corda.core.context.InvocationOrigin
|
||||||
@ -148,7 +149,7 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
// from the database
|
// from the database
|
||||||
private val startingStateRefs: MutableSet<ScheduledStateRef> = ConcurrentHashMap.newKeySet<ScheduledStateRef>()
|
private val startingStateRefs: MutableSet<ScheduledStateRef> = ConcurrentHashMap.newKeySet<ScheduledStateRef>()
|
||||||
private val mutex = ThreadBox(InnerState())
|
private val mutex = ThreadBox(InnerState())
|
||||||
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
|
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor(DefaultThreadFactory("SchedulerService"))
|
||||||
|
|
||||||
// if there's nothing to do, check every minute if something fell through the cracks.
|
// if there's nothing to do, check every minute if something fell through the cracks.
|
||||||
// any new state should trigger a reschedule immediately if nothing is scheduled, so I would not expect
|
// any new state should trigger a reschedule immediately if nothing is scheduled, so I would not expect
|
||||||
|
@ -2,8 +2,8 @@ package net.corda.node.services.events
|
|||||||
|
|
||||||
import net.corda.core.contracts.ScheduledStateRef
|
import net.corda.core.contracts.ScheduledStateRef
|
||||||
import net.corda.core.contracts.StateRef
|
import net.corda.core.contracts.StateRef
|
||||||
import net.corda.core.crypto.SecureHash
|
|
||||||
import net.corda.core.schemas.PersistentStateRef
|
import net.corda.core.schemas.PersistentStateRef
|
||||||
|
import net.corda.node.services.vault.toStateRef
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
|
|
||||||
interface ScheduledFlowRepository {
|
interface ScheduledFlowRepository {
|
||||||
@ -25,9 +25,8 @@ class PersistentScheduledFlowRepository(val database: CordaPersistence) : Schedu
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair<StateRef, ScheduledStateRef> {
|
private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair<StateRef, ScheduledStateRef> {
|
||||||
val txId = scheduledStateRecord.output.txId
|
val stateRef = scheduledStateRecord.output.toStateRef()
|
||||||
val index = scheduledStateRecord.output.index
|
return Pair(stateRef, ScheduledStateRef(stateRef, scheduledStateRecord.scheduledAt))
|
||||||
return Pair(StateRef(SecureHash.create(txId), index), ScheduledStateRef(StateRef(SecureHash.create(txId), index), scheduledStateRecord.scheduledAt))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun delete(key: StateRef): Boolean {
|
override fun delete(key: StateRef): Boolean {
|
||||||
|
@ -65,7 +65,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
|||||||
private val messagingServerAddress: NetworkHostAndPort,
|
private val messagingServerAddress: NetworkHostAndPort,
|
||||||
private val maxMessageSize: Int,
|
private val maxMessageSize: Int,
|
||||||
private val journalBufferTimeout : Int? = null,
|
private val journalBufferTimeout : Int? = null,
|
||||||
private val threadPoolName: String = "ArtemisServer",
|
private val threadPoolName: String = "P2PServer",
|
||||||
private val trace: Boolean = false,
|
private val trace: Boolean = false,
|
||||||
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON,
|
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON,
|
||||||
private val remotingThreads: Int? = null) : ArtemisBroker, SingletonSerializeAsToken() {
|
private val remotingThreads: Int? = null) : ArtemisBroker, SingletonSerializeAsToken() {
|
||||||
|
@ -11,6 +11,8 @@ import net.corda.core.internal.declaredField
|
|||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.nodeapi.internal.ArtemisTcpTransport
|
import net.corda.nodeapi.internal.ArtemisTcpTransport
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.sslDelegatedTaskExecutor
|
import net.corda.nodeapi.internal.protonwrapper.netty.sslDelegatedTaskExecutor
|
||||||
|
import net.corda.nodeapi.internal.setThreadPoolName
|
||||||
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||||
import org.apache.activemq.artemis.api.core.BaseInterceptor
|
import org.apache.activemq.artemis.api.core.BaseInterceptor
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
|
||||||
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler
|
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler
|
||||||
@ -41,10 +43,23 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
|
|||||||
handler: BufferHandler?,
|
handler: BufferHandler?,
|
||||||
listener: ServerConnectionLifeCycleListener?,
|
listener: ServerConnectionLifeCycleListener?,
|
||||||
threadPool: Executor,
|
threadPool: Executor,
|
||||||
scheduledThreadPool: ScheduledExecutorService?,
|
scheduledThreadPool: ScheduledExecutorService,
|
||||||
protocolMap: MutableMap<String, ProtocolManager<BaseInterceptor<*>, RedirectHandler<*>>>?): Acceptor {
|
protocolMap: MutableMap<String, ProtocolManager<BaseInterceptor<*>, RedirectHandler<*>>>?): Acceptor {
|
||||||
|
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Acceptor", configuration)
|
||||||
|
threadPool.setThreadPoolName("$threadPoolName-artemis")
|
||||||
|
scheduledThreadPool.setThreadPoolName("$threadPoolName-artemis-scheduler")
|
||||||
val failureExecutor = OrderedExecutor(threadPool)
|
val failureExecutor = OrderedExecutor(threadPool)
|
||||||
return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
return NodeNettyAcceptor(
|
||||||
|
name,
|
||||||
|
clusterConnection,
|
||||||
|
configuration,
|
||||||
|
handler,
|
||||||
|
listener,
|
||||||
|
scheduledThreadPool,
|
||||||
|
failureExecutor,
|
||||||
|
protocolMap,
|
||||||
|
"$threadPoolName-netty"
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -55,7 +70,8 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
|
|||||||
listener: ServerConnectionLifeCycleListener?,
|
listener: ServerConnectionLifeCycleListener?,
|
||||||
scheduledThreadPool: ScheduledExecutorService?,
|
scheduledThreadPool: ScheduledExecutorService?,
|
||||||
failureExecutor: Executor,
|
failureExecutor: Executor,
|
||||||
protocolMap: MutableMap<String, ProtocolManager<BaseInterceptor<*>, RedirectHandler<*>>>?) :
|
protocolMap: MutableMap<String, ProtocolManager<BaseInterceptor<*>, RedirectHandler<*>>>?,
|
||||||
|
private val threadPoolName: String) :
|
||||||
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
||||||
{
|
{
|
||||||
companion object {
|
companion object {
|
||||||
@ -68,7 +84,6 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "NodeNettyAcceptor", configuration)
|
|
||||||
private val sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName)
|
private val sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName)
|
||||||
private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
||||||
|
|
||||||
|
@ -74,7 +74,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private val parametersUpdatesTrack = PublishSubject.create<ParametersUpdateInfo>()
|
private val parametersUpdatesTrack = PublishSubject.create<ParametersUpdateInfo>()
|
||||||
private val networkMapPoller = ScheduledThreadPoolExecutor(1, NamedThreadFactory("Network Map Updater Thread")).apply {
|
private val networkMapPoller = ScheduledThreadPoolExecutor(1, NamedThreadFactory("NetworkMapUpdater")).apply {
|
||||||
executeExistingDelayedTasksAfterShutdownPolicy = false
|
executeExistingDelayedTasksAfterShutdownPolicy = false
|
||||||
}
|
}
|
||||||
private var newNetworkParameters: Pair<ParametersUpdate, SignedNetworkParameters>? = null
|
private var newNetworkParameters: Pair<ParametersUpdate, SignedNetworkParameters>? = null
|
||||||
@ -261,9 +261,12 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
|||||||
//as HTTP GET is mostly IO bound, use more threads than CPU's
|
//as HTTP GET is mostly IO bound, use more threads than CPU's
|
||||||
//maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests
|
//maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests
|
||||||
val threadsToUseForNetworkMapDownload = min(Runtime.getRuntime().availableProcessors() * 4, 24)
|
val threadsToUseForNetworkMapDownload = min(Runtime.getRuntime().availableProcessors() * 4, 24)
|
||||||
val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(threadsToUseForNetworkMapDownload, NamedThreadFactory("NetworkMapUpdaterNodeInfoDownloadThread"))
|
val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(
|
||||||
|
threadsToUseForNetworkMapDownload,
|
||||||
|
NamedThreadFactory("NetworkMapUpdaterNodeInfoDownload")
|
||||||
|
)
|
||||||
//DB insert is single threaded - use a single threaded executor for it.
|
//DB insert is single threaded - use a single threaded executor for it.
|
||||||
val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsertThread"))
|
val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsert"))
|
||||||
val hashesToFetch = (allHashesFromNetworkMap - allNodeHashes)
|
val hashesToFetch = (allHashesFromNetworkMap - allNodeHashes)
|
||||||
val networkMapDownloadStartTime = System.currentTimeMillis()
|
val networkMapDownloadStartTime = System.currentTimeMillis()
|
||||||
if (hashesToFetch.isNotEmpty()) {
|
if (hashesToFetch.isNotEmpty()) {
|
||||||
|
@ -22,8 +22,7 @@ class InternalRPCMessagingClient(val sslConfig: MutualSslConfiguration, val serv
|
|||||||
private var rpcServer: RPCServer? = null
|
private var rpcServer: RPCServer? = null
|
||||||
|
|
||||||
fun init(rpcOps: List<RPCOps>, securityManager: RPCSecurityManager, cacheFactory: NamedCacheFactory) = synchronized(this) {
|
fun init(rpcOps: List<RPCOps>, securityManager: RPCSecurityManager, cacheFactory: NamedCacheFactory) = synchronized(this) {
|
||||||
|
val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig, threadPoolName = "RPCClient")
|
||||||
val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig)
|
|
||||||
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||||
// would be the default and the two lines below can be deleted.
|
// would be the default and the two lines below can be deleted.
|
||||||
|
@ -30,10 +30,10 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
|
|||||||
setDirectories(baseDirectory)
|
setDirectories(baseDirectory)
|
||||||
|
|
||||||
val acceptorConfigurationsSet = mutableSetOf(
|
val acceptorConfigurationsSet = mutableSetOf(
|
||||||
rpcAcceptorTcpTransport(address, sslOptions, enableSSL = useSsl)
|
rpcAcceptorTcpTransport(address, sslOptions, enableSSL = useSsl, threadPoolName = "RPCServer")
|
||||||
)
|
)
|
||||||
adminAddress?.let {
|
adminAddress?.let {
|
||||||
acceptorConfigurationsSet += rpcInternalAcceptorTcpTransport(it, nodeConfiguration)
|
acceptorConfigurationsSet += rpcInternalAcceptorTcpTransport(it, nodeConfiguration, threadPoolName = "RPCServerAdmin")
|
||||||
}
|
}
|
||||||
acceptorConfigurations = acceptorConfigurationsSet
|
acceptorConfigurations = acceptorConfigurationsSet
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.node.services.statemachine
|
package net.corda.node.services.statemachine
|
||||||
|
|
||||||
|
import io.netty.util.concurrent.DefaultThreadFactory
|
||||||
import net.corda.core.flows.FlowSession
|
import net.corda.core.flows.FlowSession
|
||||||
import net.corda.core.internal.FlowIORequest
|
import net.corda.core.internal.FlowIORequest
|
||||||
import net.corda.core.internal.FlowStateMachine
|
import net.corda.core.internal.FlowStateMachine
|
||||||
@ -22,10 +23,6 @@ internal class FlowMonitor(
|
|||||||
) : LifecycleSupport {
|
) : LifecycleSupport {
|
||||||
|
|
||||||
private companion object {
|
private companion object {
|
||||||
private fun defaultScheduler(): ScheduledExecutorService {
|
|
||||||
return Executors.newSingleThreadScheduledExecutor()
|
|
||||||
}
|
|
||||||
|
|
||||||
private val logger = loggerFor<FlowMonitor>()
|
private val logger = loggerFor<FlowMonitor>()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,7 +33,7 @@ internal class FlowMonitor(
|
|||||||
override fun start() {
|
override fun start() {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
if (scheduler == null) {
|
if (scheduler == null) {
|
||||||
scheduler = defaultScheduler()
|
scheduler = Executors.newSingleThreadScheduledExecutor(DefaultThreadFactory("FlowMonitor"))
|
||||||
shutdownScheduler = true
|
shutdownScheduler = true
|
||||||
}
|
}
|
||||||
scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty() }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS)
|
scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty() }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS)
|
||||||
|
@ -25,6 +25,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
|||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
|
import net.corda.node.services.vault.toStateRef
|
||||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||||
@ -157,13 +158,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
|||||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||||
fromPersistentEntity = {
|
fromPersistentEntity = {
|
||||||
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
||||||
val txId = it.id.txId
|
Pair(it.id.toStateRef(), SecureHash.create(it.consumingTxHash))
|
||||||
val index = it.id.index
|
|
||||||
Pair(
|
|
||||||
StateRef(txhash = SecureHash.create(txId), index = index),
|
|
||||||
SecureHash.create(it.consumingTxHash)
|
|
||||||
)
|
|
||||||
|
|
||||||
},
|
},
|
||||||
toPersistentEntity = { (txHash, index): StateRef, id: SecureHash ->
|
toPersistentEntity = { (txHash, index): StateRef, id: SecureHash ->
|
||||||
CommittedState(
|
CommittedState(
|
||||||
|
@ -3,28 +3,65 @@ package net.corda.node.services.vault
|
|||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import co.paralleluniverse.strands.Strand
|
import co.paralleluniverse.strands.Strand
|
||||||
import net.corda.core.CordaRuntimeException
|
import net.corda.core.CordaRuntimeException
|
||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.Amount
|
||||||
|
import net.corda.core.contracts.ContractState
|
||||||
|
import net.corda.core.contracts.FungibleAsset
|
||||||
|
import net.corda.core.contracts.FungibleState
|
||||||
|
import net.corda.core.contracts.Issued
|
||||||
|
import net.corda.core.contracts.OwnableState
|
||||||
|
import net.corda.core.contracts.StateAndRef
|
||||||
|
import net.corda.core.contracts.StateRef
|
||||||
|
import net.corda.core.contracts.TransactionState
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.crypto.containsAny
|
import net.corda.core.crypto.containsAny
|
||||||
import net.corda.core.flows.HospitalizeFlowException
|
import net.corda.core.flows.HospitalizeFlowException
|
||||||
import net.corda.core.internal.*
|
import net.corda.core.internal.ThreadBox
|
||||||
|
import net.corda.core.internal.TransactionDeserialisationException
|
||||||
|
import net.corda.core.internal.VisibleForTesting
|
||||||
|
import net.corda.core.internal.bufferUntilSubscribed
|
||||||
|
import net.corda.core.internal.tee
|
||||||
|
import net.corda.core.internal.uncheckedCast
|
||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.node.ServicesForResolution
|
|
||||||
import net.corda.core.node.StatesToRecord
|
import net.corda.core.node.StatesToRecord
|
||||||
import net.corda.core.node.services.*
|
import net.corda.core.node.services.KeyManagementService
|
||||||
import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo
|
import net.corda.core.node.services.StatesNotAvailableException
|
||||||
import net.corda.core.node.services.vault.*
|
import net.corda.core.node.services.Vault
|
||||||
|
import net.corda.core.node.services.VaultQueryException
|
||||||
|
import net.corda.core.node.services.VaultService
|
||||||
|
import net.corda.core.node.services.queryBy
|
||||||
|
import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
|
||||||
|
import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE
|
||||||
|
import net.corda.core.node.services.vault.PageSpecification
|
||||||
|
import net.corda.core.node.services.vault.QueryCriteria
|
||||||
|
import net.corda.core.node.services.vault.Sort
|
||||||
|
import net.corda.core.node.services.vault.SortAttribute
|
||||||
|
import net.corda.core.node.services.vault.builder
|
||||||
import net.corda.core.observable.internal.OnResilientSubscribe
|
import net.corda.core.observable.internal.OnResilientSubscribe
|
||||||
import net.corda.core.schemas.PersistentStateRef
|
import net.corda.core.schemas.PersistentStateRef
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.core.transactions.*
|
import net.corda.core.transactions.ContractUpgradeWireTransaction
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.transactions.CoreTransaction
|
||||||
|
import net.corda.core.transactions.FullTransaction
|
||||||
|
import net.corda.core.transactions.LedgerTransaction
|
||||||
|
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||||
|
import net.corda.core.transactions.WireTransaction
|
||||||
|
import net.corda.core.utilities.NonEmptySet
|
||||||
|
import net.corda.core.utilities.contextLogger
|
||||||
|
import net.corda.core.utilities.debug
|
||||||
|
import net.corda.core.utilities.toNonEmptySet
|
||||||
|
import net.corda.core.utilities.trace
|
||||||
|
import net.corda.node.internal.NodeServicesForResolution
|
||||||
import net.corda.node.services.api.SchemaService
|
import net.corda.node.services.api.SchemaService
|
||||||
import net.corda.node.services.api.VaultServiceInternal
|
import net.corda.node.services.api.VaultServiceInternal
|
||||||
import net.corda.node.services.schema.PersistentStateService
|
import net.corda.node.services.schema.PersistentStateService
|
||||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||||
import net.corda.nodeapi.internal.persistence.*
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
|
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||||
|
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
|
||||||
|
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||||
|
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||||
import org.hibernate.Session
|
import org.hibernate.Session
|
||||||
|
import org.hibernate.query.Query
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.exceptions.OnErrorNotImplementedException
|
import rx.exceptions.OnErrorNotImplementedException
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
@ -32,9 +69,11 @@ import java.security.PublicKey
|
|||||||
import java.sql.SQLException
|
import java.sql.SQLException
|
||||||
import java.time.Clock
|
import java.time.Clock
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.Arrays
|
||||||
|
import java.util.UUID
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.CopyOnWriteArraySet
|
import java.util.concurrent.CopyOnWriteArraySet
|
||||||
|
import java.util.stream.Stream
|
||||||
import javax.persistence.PersistenceException
|
import javax.persistence.PersistenceException
|
||||||
import javax.persistence.Tuple
|
import javax.persistence.Tuple
|
||||||
import javax.persistence.criteria.CriteriaBuilder
|
import javax.persistence.criteria.CriteriaBuilder
|
||||||
@ -54,9 +93,9 @@ import javax.persistence.criteria.Root
|
|||||||
class NodeVaultService(
|
class NodeVaultService(
|
||||||
private val clock: Clock,
|
private val clock: Clock,
|
||||||
private val keyManagementService: KeyManagementService,
|
private val keyManagementService: KeyManagementService,
|
||||||
private val servicesForResolution: ServicesForResolution,
|
private val servicesForResolution: NodeServicesForResolution,
|
||||||
private val database: CordaPersistence,
|
private val database: CordaPersistence,
|
||||||
private val schemaService: SchemaService,
|
schemaService: SchemaService,
|
||||||
private val appClassloader: ClassLoader
|
private val appClassloader: ClassLoader
|
||||||
) : SingletonSerializeAsToken(), VaultServiceInternal {
|
) : SingletonSerializeAsToken(), VaultServiceInternal {
|
||||||
companion object {
|
companion object {
|
||||||
@ -196,7 +235,7 @@ class NodeVaultService(
|
|||||||
if (lockId != null) {
|
if (lockId != null) {
|
||||||
lockId = null
|
lockId = null
|
||||||
lockUpdateTime = clock.instant()
|
lockUpdateTime = clock.instant()
|
||||||
log.trace("Releasing soft lock on consumed state: $stateRef")
|
log.trace { "Releasing soft lock on consumed state: $stateRef" }
|
||||||
}
|
}
|
||||||
session.save(state)
|
session.save(state)
|
||||||
}
|
}
|
||||||
@ -227,7 +266,7 @@ class NodeVaultService(
|
|||||||
}
|
}
|
||||||
// we are not inside a flow, we are most likely inside a CordaService;
|
// we are not inside a flow, we are most likely inside a CordaService;
|
||||||
// we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates.
|
// we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates.
|
||||||
return _rawUpdatesPublisher.resilientOnError()
|
_rawUpdatesPublisher.resilientOnError()
|
||||||
}
|
}
|
||||||
|
|
||||||
override val updates: Observable<Vault.Update<ContractState>>
|
override val updates: Observable<Vault.Update<ContractState>>
|
||||||
@ -639,7 +678,23 @@ class NodeVaultService(
|
|||||||
@Throws(VaultQueryException::class)
|
@Throws(VaultQueryException::class)
|
||||||
override fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): Vault.Page<T> {
|
override fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): Vault.Page<T> {
|
||||||
try {
|
try {
|
||||||
return _queryBy(criteria, paging, sorting, contractStateType, false)
|
// We decrement by one if the client requests MAX_VALUE, assuming they can not notice this because they don't have enough memory
|
||||||
|
// to request MAX_VALUE states at once.
|
||||||
|
val validPaging = if (paging.pageSize == Integer.MAX_VALUE) {
|
||||||
|
paging.copy(pageSize = Integer.MAX_VALUE - 1)
|
||||||
|
} else {
|
||||||
|
checkVaultQuery(paging.pageSize >= 1) { "Page specification: invalid page size ${paging.pageSize} [minimum is 1]" }
|
||||||
|
paging
|
||||||
|
}
|
||||||
|
if (!validPaging.isDefault) {
|
||||||
|
checkVaultQuery(validPaging.pageNumber >= DEFAULT_PAGE_NUM) {
|
||||||
|
"Page specification: invalid page number ${validPaging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $validPaging, sorting: $sorting" }
|
||||||
|
return database.transaction {
|
||||||
|
queryBy(criteria, validPaging, sorting, contractStateType)
|
||||||
|
}
|
||||||
} catch (e: VaultQueryException) {
|
} catch (e: VaultQueryException) {
|
||||||
throw e
|
throw e
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
@ -647,100 +702,90 @@ class NodeVaultService(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Throws(VaultQueryException::class)
|
private fun <T : ContractState> queryBy(criteria: QueryCriteria,
|
||||||
private fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging_: PageSpecification, sorting: Sort, contractStateType: Class<out T>, skipPagingChecks: Boolean): Vault.Page<T> {
|
paging: PageSpecification,
|
||||||
// We decrement by one if the client requests MAX_PAGE_SIZE, assuming they can not notice this because they don't have enough memory
|
sorting: Sort,
|
||||||
// to request `MAX_PAGE_SIZE` states at once.
|
contractStateType: Class<out T>): Vault.Page<T> {
|
||||||
val paging = if (paging_.pageSize == Integer.MAX_VALUE) {
|
// calculate total results where a page specification has been defined
|
||||||
paging_.copy(pageSize = Integer.MAX_VALUE - 1)
|
val totalStatesAvailable = if (paging.isDefault) -1 else queryTotalStateCount(criteria, contractStateType)
|
||||||
} else {
|
|
||||||
paging_
|
val (query, stateTypes) = createQuery(criteria, contractStateType, sorting)
|
||||||
|
query.setResultWindow(paging)
|
||||||
|
|
||||||
|
val statesMetadata: MutableList<Vault.StateMetadata> = mutableListOf()
|
||||||
|
val otherResults: MutableList<Any> = mutableListOf()
|
||||||
|
|
||||||
|
query.resultStream(paging).use { results ->
|
||||||
|
results.forEach { result ->
|
||||||
|
val result0 = result[0]
|
||||||
|
if (result0 is VaultSchemaV1.VaultStates) {
|
||||||
|
statesMetadata.add(result0.toStateMetadata())
|
||||||
|
} else {
|
||||||
|
log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" }
|
||||||
|
otherResults.addAll(result.toArray().asList())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $paging, sorting: $sorting" }
|
|
||||||
return database.transaction {
|
|
||||||
// calculate total results where a page specification has been defined
|
|
||||||
var totalStates = -1L
|
|
||||||
if (!skipPagingChecks && !paging.isDefault) {
|
|
||||||
val count = builder { VaultSchemaV1.VaultStates::recordedTime.count() }
|
|
||||||
val countCriteria = QueryCriteria.VaultCustomQueryCriteria(count, Vault.StateStatus.ALL)
|
|
||||||
val results = _queryBy(criteria.and(countCriteria), PageSpecification(), Sort(emptyList()), contractStateType, true) // only skip pagination checks for total results count query
|
|
||||||
totalStates = results.otherResults.last() as Long
|
|
||||||
}
|
|
||||||
|
|
||||||
val session = getSession()
|
val states: List<StateAndRef<T>> = servicesForResolution.loadStates(
|
||||||
|
statesMetadata.mapTo(LinkedHashSet()) { it.ref },
|
||||||
|
ArrayList()
|
||||||
|
)
|
||||||
|
|
||||||
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
|
return Vault.Page(states, statesMetadata, totalStatesAvailable, stateTypes, otherResults)
|
||||||
val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
}
|
||||||
|
|
||||||
// TODO: revisit (use single instance of parser for all queries)
|
|
||||||
val criteriaParser = HibernateQueryCriteriaParser(contractStateType, contractStateTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates)
|
|
||||||
|
|
||||||
// parse criteria and build where predicates
|
|
||||||
criteriaParser.parse(criteria, sorting)
|
|
||||||
|
|
||||||
// prepare query for execution
|
|
||||||
val query = session.createQuery(criteriaQuery)
|
|
||||||
|
|
||||||
// pagination checks
|
|
||||||
if (!skipPagingChecks && !paging.isDefault) {
|
|
||||||
// pagination
|
|
||||||
if (paging.pageNumber < DEFAULT_PAGE_NUM) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]")
|
|
||||||
if (paging.pageSize < 1) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [minimum is 1]")
|
|
||||||
if (paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum is $MAX_PAGE_SIZE]")
|
|
||||||
}
|
|
||||||
|
|
||||||
// For both SQLServer and PostgresSQL, firstResult must be >= 0. So we set a floor at 0.
|
|
||||||
// TODO: This is a catch-all solution. But why is the default pageNumber set to be -1 in the first place?
|
|
||||||
// Even if we set the default pageNumber to be 1 instead, that may not cover the non-default cases.
|
|
||||||
// So the floor may be necessary anyway.
|
|
||||||
query.firstResult = maxOf(0, (paging.pageNumber - 1) * paging.pageSize)
|
|
||||||
val pageSize = paging.pageSize + 1
|
|
||||||
query.maxResults = if (pageSize > 0) pageSize else Integer.MAX_VALUE // detection too many results, protected against overflow
|
|
||||||
|
|
||||||
// execution
|
|
||||||
val results = query.resultList
|
|
||||||
|
|
||||||
|
private fun <R> Query<R>.resultStream(paging: PageSpecification): Stream<R> {
|
||||||
|
return if (paging.isDefault) {
|
||||||
|
val allResults = resultList
|
||||||
// final pagination check (fail-fast on too many results when no pagination specified)
|
// final pagination check (fail-fast on too many results when no pagination specified)
|
||||||
if (!skipPagingChecks && paging.isDefault && results.size > DEFAULT_PAGE_SIZE) {
|
checkVaultQuery(allResults.size != paging.pageSize + 1) {
|
||||||
throw VaultQueryException("There are ${results.size} results, which exceeds the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. In order to retrieve these results, provide a `PageSpecification(pageNumber, pageSize)` to the method invoked.")
|
"There are more results than the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. " +
|
||||||
|
"In order to retrieve these results, provide a PageSpecification to the method invoked."
|
||||||
}
|
}
|
||||||
val statesAndRefs: MutableList<StateAndRef<T>> = mutableListOf()
|
allResults.stream()
|
||||||
val statesMeta: MutableList<Vault.StateMetadata> = mutableListOf()
|
} else {
|
||||||
val otherResults: MutableList<Any> = mutableListOf()
|
stream()
|
||||||
val stateRefs = mutableSetOf<StateRef>()
|
|
||||||
|
|
||||||
results.asSequence()
|
|
||||||
.forEachIndexed { index, result ->
|
|
||||||
if (result[0] is VaultSchemaV1.VaultStates) {
|
|
||||||
if (!paging.isDefault && index == paging.pageSize) // skip last result if paged
|
|
||||||
return@forEachIndexed
|
|
||||||
val vaultState = result[0] as VaultSchemaV1.VaultStates
|
|
||||||
val stateRef = StateRef(SecureHash.create(vaultState.stateRef!!.txId), vaultState.stateRef!!.index)
|
|
||||||
stateRefs.add(stateRef)
|
|
||||||
statesMeta.add(Vault.StateMetadata(stateRef,
|
|
||||||
vaultState.contractStateClassName,
|
|
||||||
vaultState.recordedTime,
|
|
||||||
vaultState.consumedTime,
|
|
||||||
vaultState.stateStatus,
|
|
||||||
vaultState.notary,
|
|
||||||
vaultState.lockId,
|
|
||||||
vaultState.lockUpdateTime,
|
|
||||||
vaultState.relevancyStatus,
|
|
||||||
constraintInfo(vaultState.constraintType, vaultState.constraintData)
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
// TODO: improve typing of returned other results
|
|
||||||
log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" }
|
|
||||||
otherResults.addAll(result.toArray().asList())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (stateRefs.isNotEmpty())
|
|
||||||
statesAndRefs.addAll(uncheckedCast(servicesForResolution.loadStates(stateRefs)))
|
|
||||||
|
|
||||||
Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun Query<*>.setResultWindow(paging: PageSpecification) {
|
||||||
|
if (paging.isDefault) {
|
||||||
|
// For both SQLServer and PostgresSQL, firstResult must be >= 0.
|
||||||
|
firstResult = 0
|
||||||
|
// Peek ahead and see if there are more results in case pagination should be done
|
||||||
|
maxResults = paging.pageSize + 1
|
||||||
|
} else {
|
||||||
|
firstResult = (paging.pageNumber - 1) * paging.pageSize
|
||||||
|
maxResults = paging.pageSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun <T : ContractState> queryTotalStateCount(baseCriteria: QueryCriteria, contractStateType: Class<out T>): Long {
|
||||||
|
val count = builder { VaultSchemaV1.VaultStates::recordedTime.count() }
|
||||||
|
val countCriteria = QueryCriteria.VaultCustomQueryCriteria(count, Vault.StateStatus.ALL)
|
||||||
|
val criteria = baseCriteria.and(countCriteria)
|
||||||
|
val (query) = createQuery(criteria, contractStateType, null)
|
||||||
|
val results = query.resultList
|
||||||
|
return results.last().toArray().last() as Long
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun <T : ContractState> createQuery(criteria: QueryCriteria,
|
||||||
|
contractStateType: Class<out T>,
|
||||||
|
sorting: Sort?): Pair<Query<Tuple>, Vault.StateStatus> {
|
||||||
|
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
|
||||||
|
val criteriaParser = HibernateQueryCriteriaParser(
|
||||||
|
contractStateType,
|
||||||
|
contractStateTypeMappings,
|
||||||
|
criteriaBuilder,
|
||||||
|
criteriaQuery,
|
||||||
|
criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||||
|
)
|
||||||
|
criteriaParser.parse(criteria, sorting)
|
||||||
|
val query = getSession().createQuery(criteriaQuery)
|
||||||
|
return Pair(query, criteriaParser.stateTypes)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a [DataFeed] containing the results of the provided query, along with the associated observable, containing any subsequent updates.
|
* Returns a [DataFeed] containing the results of the provided query, along with the associated observable, containing any subsequent updates.
|
||||||
*
|
*
|
||||||
@ -775,6 +820,12 @@ class NodeVaultService(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private inline fun checkVaultQuery(value: Boolean, lazyMessage: () -> Any) {
|
||||||
|
if (!value) {
|
||||||
|
throw VaultQueryException(lazyMessage().toString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun <T : ContractState> filterContractStates(update: Vault.Update<T>, contractStateType: Class<out T>) =
|
private fun <T : ContractState> filterContractStates(update: Vault.Update<T>, contractStateType: Class<out T>) =
|
||||||
update.copy(consumed = filterByContractState(contractStateType, update.consumed),
|
update.copy(consumed = filterByContractState(contractStateType, update.consumed),
|
||||||
produced = filterByContractState(contractStateType, update.produced))
|
produced = filterByContractState(contractStateType, update.produced))
|
||||||
@ -802,6 +853,7 @@ class NodeVaultService(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun getSession() = database.currentOrNew().session
|
private fun getSession() = database.currentOrNew().session
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Derive list from existing vault states and then incrementally update using vault observables
|
* Derive list from existing vault states and then incrementally update using vault observables
|
||||||
*/
|
*/
|
||||||
|
@ -2,7 +2,9 @@ package net.corda.node.services.vault
|
|||||||
|
|
||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
import net.corda.core.contracts.MAX_ISSUER_REF_SIZE
|
import net.corda.core.contracts.MAX_ISSUER_REF_SIZE
|
||||||
|
import net.corda.core.contracts.StateRef
|
||||||
import net.corda.core.contracts.UniqueIdentifier
|
import net.corda.core.contracts.UniqueIdentifier
|
||||||
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.crypto.toStringShort
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
@ -192,3 +194,19 @@ object VaultSchemaV1 : MappedSchema(
|
|||||||
) : IndirectStatePersistable<PersistentStateRefAndKey>
|
) : IndirectStatePersistable<PersistentStateRefAndKey>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun PersistentStateRef.toStateRef(): StateRef = StateRef(SecureHash.create(txId), index)
|
||||||
|
|
||||||
|
fun VaultSchemaV1.VaultStates.toStateMetadata(): Vault.StateMetadata {
|
||||||
|
return Vault.StateMetadata(
|
||||||
|
stateRef!!.toStateRef(),
|
||||||
|
contractStateClassName,
|
||||||
|
recordedTime,
|
||||||
|
consumedTime,
|
||||||
|
stateStatus,
|
||||||
|
notary,
|
||||||
|
lockId,
|
||||||
|
lockUpdateTime,
|
||||||
|
relevancyStatus,
|
||||||
|
Vault.ConstraintInfo.constraintInfo(constraintType, constraintData)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
@ -21,6 +21,7 @@ import net.corda.core.utilities.getOrThrow
|
|||||||
import net.corda.core.utilities.unwrap
|
import net.corda.core.utilities.unwrap
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||||
|
import net.corda.node.services.vault.toStateRef
|
||||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
@ -41,6 +42,8 @@ class BFTSmartNotaryService(
|
|||||||
) : NotaryService() {
|
) : NotaryService() {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
|
|
||||||
|
@Suppress("unused") // Used by NotaryLoader via reflection
|
||||||
@JvmStatic
|
@JvmStatic
|
||||||
val serializationFilter
|
val serializationFilter
|
||||||
get() = { clazz: Class<*> ->
|
get() = { clazz: Class<*> ->
|
||||||
@ -147,12 +150,7 @@ class BFTSmartNotaryService(
|
|||||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||||
fromPersistentEntity = {
|
fromPersistentEntity = {
|
||||||
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
||||||
val txId = it.id.txId
|
Pair(it.id.toStateRef(), SecureHash.create(it.consumingTxHash))
|
||||||
val index = it.id.index
|
|
||||||
Pair(
|
|
||||||
StateRef(txhash = SecureHash.create(txId), index = index),
|
|
||||||
SecureHash.create(it.consumingTxHash)
|
|
||||||
)
|
|
||||||
},
|
},
|
||||||
toPersistentEntity = { (txHash, index): StateRef, id: SecureHash ->
|
toPersistentEntity = { (txHash, index): StateRef, id: SecureHash ->
|
||||||
CommittedState(
|
CommittedState(
|
||||||
|
@ -24,6 +24,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
|||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
|
import net.corda.node.services.vault.toStateRef
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||||
import net.corda.notary.common.InternalResult
|
import net.corda.notary.common.InternalResult
|
||||||
@ -142,10 +143,6 @@ class JPAUniquenessProvider(
|
|||||||
fun encodeStateRef(s: StateRef): PersistentStateRef {
|
fun encodeStateRef(s: StateRef): PersistentStateRef {
|
||||||
return PersistentStateRef(s.txhash.toString(), s.index)
|
return PersistentStateRef(s.txhash.toString(), s.index)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun decodeStateRef(s: PersistentStateRef): StateRef {
|
|
||||||
return StateRef(txhash = SecureHash.create(s.txId), index = s.index)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -215,15 +212,15 @@ class JPAUniquenessProvider(
|
|||||||
committedStates.addAll(existing)
|
committedStates.addAll(existing)
|
||||||
}
|
}
|
||||||
|
|
||||||
return committedStates.map {
|
return committedStates.associate {
|
||||||
val stateRef = StateRef(txhash = SecureHash.create(it.id.txId), index = it.id.index)
|
val stateRef = it.id.toStateRef()
|
||||||
val consumingTxId = SecureHash.create(it.consumingTxHash)
|
val consumingTxId = SecureHash.create(it.consumingTxHash)
|
||||||
if (stateRef in references) {
|
if (stateRef in references) {
|
||||||
stateRef to StateConsumptionDetails(consumingTxId.reHash(), type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
|
stateRef to StateConsumptionDetails(consumingTxId.reHash(), type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
|
||||||
} else {
|
} else {
|
||||||
stateRef to StateConsumptionDetails(consumingTxId.reHash())
|
stateRef to StateConsumptionDetails(consumingTxId.reHash())
|
||||||
}
|
}
|
||||||
}.toMap()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun<T> withRetry(block: () -> T): T {
|
private fun<T> withRetry(block: () -> T): T {
|
||||||
|
@ -28,12 +28,14 @@ import net.corda.finance.schemas.CashSchemaV1
|
|||||||
import net.corda.finance.test.SampleCashSchemaV1
|
import net.corda.finance.test.SampleCashSchemaV1
|
||||||
import net.corda.finance.test.SampleCashSchemaV2
|
import net.corda.finance.test.SampleCashSchemaV2
|
||||||
import net.corda.finance.test.SampleCashSchemaV3
|
import net.corda.finance.test.SampleCashSchemaV3
|
||||||
|
import net.corda.node.internal.NodeServicesForResolution
|
||||||
import net.corda.node.services.api.WritableTransactionStorage
|
import net.corda.node.services.api.WritableTransactionStorage
|
||||||
import net.corda.node.services.schema.ContractStateAndRef
|
import net.corda.node.services.schema.ContractStateAndRef
|
||||||
import net.corda.node.services.schema.NodeSchemaService
|
import net.corda.node.services.schema.NodeSchemaService
|
||||||
import net.corda.node.services.schema.PersistentStateService
|
import net.corda.node.services.schema.PersistentStateService
|
||||||
import net.corda.node.services.vault.NodeVaultService
|
import net.corda.node.services.vault.NodeVaultService
|
||||||
import net.corda.node.services.vault.VaultSchemaV1
|
import net.corda.node.services.vault.VaultSchemaV1
|
||||||
|
import net.corda.node.services.vault.toStateRef
|
||||||
import net.corda.node.testing.DummyFungibleContract
|
import net.corda.node.testing.DummyFungibleContract
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||||
@ -48,7 +50,6 @@ import net.corda.testing.internal.vault.VaultFiller
|
|||||||
import net.corda.testing.node.MockServices
|
import net.corda.testing.node.MockServices
|
||||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||||
import org.assertj.core.api.Assertions
|
import org.assertj.core.api.Assertions
|
||||||
import org.assertj.core.api.Assertions.`in`
|
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||||
import org.hibernate.SessionFactory
|
import org.hibernate.SessionFactory
|
||||||
@ -122,7 +123,14 @@ class HibernateConfigurationTest {
|
|||||||
services = object : MockServices(cordappPackages, BOB_NAME, mock<IdentityService>().also {
|
services = object : MockServices(cordappPackages, BOB_NAME, mock<IdentityService>().also {
|
||||||
doReturn(null).whenever(it).verifyAndRegisterIdentity(argThat { name == BOB_NAME })
|
doReturn(null).whenever(it).verifyAndRegisterIdentity(argThat { name == BOB_NAME })
|
||||||
}, generateKeyPair(), dummyNotary.keyPair) {
|
}, generateKeyPair(), dummyNotary.keyPair) {
|
||||||
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService, cordappClassloader).apply { start() }
|
override val vaultService = NodeVaultService(
|
||||||
|
Clock.systemUTC(),
|
||||||
|
keyManagementService,
|
||||||
|
servicesForResolution as NodeServicesForResolution,
|
||||||
|
database,
|
||||||
|
schemaService,
|
||||||
|
cordappClassloader
|
||||||
|
).apply { start() }
|
||||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||||
for (stx in txs) {
|
for (stx in txs) {
|
||||||
(validatedTransactions as WritableTransactionStorage).addTransaction(stx)
|
(validatedTransactions as WritableTransactionStorage).addTransaction(stx)
|
||||||
@ -183,7 +191,7 @@ class HibernateConfigurationTest {
|
|||||||
// execute query
|
// execute query
|
||||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||||
val coins = queryResults.map {
|
val coins = queryResults.map {
|
||||||
services.loadState(toStateRef(it.stateRef!!)).data
|
services.loadState(it.stateRef!!.toStateRef()).data
|
||||||
}.sumCash()
|
}.sumCash()
|
||||||
assertThat(coins.toDecimal() >= BigDecimal("50.00"))
|
assertThat(coins.toDecimal() >= BigDecimal("50.00"))
|
||||||
}
|
}
|
||||||
@ -739,7 +747,7 @@ class HibernateConfigurationTest {
|
|||||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||||
|
|
||||||
queryResults.forEach {
|
queryResults.forEach {
|
||||||
val cashState = services.loadState(toStateRef(it.stateRef!!)).data as Cash.State
|
val cashState = services.loadState(it.stateRef!!.toStateRef()).data as Cash.State
|
||||||
println("${it.stateRef} with owner: ${cashState.owner.owningKey.toBase58String()}")
|
println("${it.stateRef} with owner: ${cashState.owner.owningKey.toBase58String()}")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -823,7 +831,7 @@ class HibernateConfigurationTest {
|
|||||||
// execute query
|
// execute query
|
||||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||||
queryResults.forEach {
|
queryResults.forEach {
|
||||||
val cashState = services.loadState(toStateRef(it.stateRef!!)).data as Cash.State
|
val cashState = services.loadState(it.stateRef!!.toStateRef()).data as Cash.State
|
||||||
println("${it.stateRef} with owner ${cashState.owner.owningKey.toBase58String()} and participants ${cashState.participants.map { it.owningKey.toBase58String() }}")
|
println("${it.stateRef} with owner ${cashState.owner.owningKey.toBase58String()} and participants ${cashState.participants.map { it.owningKey.toBase58String() }}")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -961,10 +969,6 @@ class HibernateConfigurationTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun toStateRef(pStateRef: PersistentStateRef): StateRef {
|
|
||||||
return StateRef(SecureHash.create(pStateRef.txId), pStateRef.index)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `schema change`() {
|
fun `schema change`() {
|
||||||
fun createNewDB(schemas: Set<MappedSchema>, initialiseSchema: Boolean = true): CordaPersistence {
|
fun createNewDB(schemas: Set<MappedSchema>, initialiseSchema: Boolean = true): CordaPersistence {
|
||||||
|
@ -1674,7 +1674,7 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
|
|||||||
|
|
||||||
// pagination: last page
|
// pagination: last page
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `all states with paging specification - last`() {
|
fun `all states with paging specification - last`() {
|
||||||
database.transaction {
|
database.transaction {
|
||||||
vaultFiller.fillWithSomeTestCash(95.DOLLARS, notaryServices, 95, DUMMY_CASH_ISSUER)
|
vaultFiller.fillWithSomeTestCash(95.DOLLARS, notaryServices, 95, DUMMY_CASH_ISSUER)
|
||||||
// Last page implies we need to perform a row count for the Query first,
|
// Last page implies we need to perform a row count for the Query first,
|
||||||
@ -1723,7 +1723,7 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
|
|||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `pagination not specified but more than default results available`() {
|
fun `pagination not specified but more than default results available`() {
|
||||||
expectedEx.expect(VaultQueryException::class.java)
|
expectedEx.expect(VaultQueryException::class.java)
|
||||||
expectedEx.expectMessage("provide a `PageSpecification(pageNumber, pageSize)`")
|
expectedEx.expectMessage("provide a PageSpecification")
|
||||||
|
|
||||||
database.transaction {
|
database.transaction {
|
||||||
vaultFiller.fillWithSomeTestCash(201.DOLLARS, notaryServices, 201, DUMMY_CASH_ISSUER)
|
vaultFiller.fillWithSomeTestCash(201.DOLLARS, notaryServices, 201, DUMMY_CASH_ISSUER)
|
||||||
|
@ -10,7 +10,6 @@ import net.corda.core.flows.InitiatingFlow
|
|||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
import net.corda.core.internal.FlowStateMachine
|
import net.corda.core.internal.FlowStateMachine
|
||||||
import net.corda.core.internal.uncheckedCast
|
import net.corda.core.internal.uncheckedCast
|
||||||
import net.corda.core.node.ServicesForResolution
|
|
||||||
import net.corda.core.node.services.KeyManagementService
|
import net.corda.core.node.services.KeyManagementService
|
||||||
import net.corda.core.node.services.queryBy
|
import net.corda.core.node.services.queryBy
|
||||||
import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition
|
import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition
|
||||||
@ -29,6 +28,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
|
|||||||
import net.corda.testing.core.singleIdentity
|
import net.corda.testing.core.singleIdentity
|
||||||
import net.corda.testing.flows.registerCoreFlowFactory
|
import net.corda.testing.flows.registerCoreFlowFactory
|
||||||
import net.corda.coretesting.internal.rigorousMock
|
import net.corda.coretesting.internal.rigorousMock
|
||||||
|
import net.corda.node.internal.NodeServicesForResolution
|
||||||
import net.corda.testing.node.internal.InternalMockNetwork
|
import net.corda.testing.node.internal.InternalMockNetwork
|
||||||
import net.corda.testing.node.internal.enclosedCordapp
|
import net.corda.testing.node.internal.enclosedCordapp
|
||||||
import net.corda.testing.node.internal.startFlow
|
import net.corda.testing.node.internal.startFlow
|
||||||
@ -86,7 +86,7 @@ class VaultSoftLockManagerTest {
|
|||||||
private val mockNet = InternalMockNetwork(cordappsForAllNodes = listOf(enclosedCordapp()), defaultFactory = { args ->
|
private val mockNet = InternalMockNetwork(cordappsForAllNodes = listOf(enclosedCordapp()), defaultFactory = { args ->
|
||||||
object : InternalMockNetwork.MockNode(args) {
|
object : InternalMockNetwork.MockNode(args) {
|
||||||
override fun makeVaultService(keyManagementService: KeyManagementService,
|
override fun makeVaultService(keyManagementService: KeyManagementService,
|
||||||
services: ServicesForResolution,
|
services: NodeServicesForResolution,
|
||||||
database: CordaPersistence,
|
database: CordaPersistence,
|
||||||
cordappLoader: CordappLoader): VaultServiceInternal {
|
cordappLoader: CordappLoader): VaultServiceInternal {
|
||||||
val node = this
|
val node = this
|
||||||
|
@ -26,6 +26,7 @@ import net.corda.core.transactions.SignedTransaction
|
|||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.internal.ServicesForResolutionImpl
|
import net.corda.node.internal.ServicesForResolutionImpl
|
||||||
|
import net.corda.node.internal.NodeServicesForResolution
|
||||||
import net.corda.node.internal.cordapp.JarScanningCordappLoader
|
import net.corda.node.internal.cordapp.JarScanningCordappLoader
|
||||||
import net.corda.node.services.api.*
|
import net.corda.node.services.api.*
|
||||||
import net.corda.node.services.diagnostics.NodeDiagnosticsService
|
import net.corda.node.services.diagnostics.NodeDiagnosticsService
|
||||||
@ -460,7 +461,14 @@ open class MockServices private constructor(
|
|||||||
get() = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParametersService, validatedTransactions)
|
get() = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParametersService, validatedTransactions)
|
||||||
|
|
||||||
internal fun makeVaultService(schemaService: SchemaService, database: CordaPersistence, cordappLoader: CordappLoader): VaultServiceInternal {
|
internal fun makeVaultService(schemaService: SchemaService, database: CordaPersistence, cordappLoader: CordappLoader): VaultServiceInternal {
|
||||||
return NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService, cordappLoader.appClassLoader).apply { start() }
|
return NodeVaultService(
|
||||||
|
clock,
|
||||||
|
keyManagementService,
|
||||||
|
servicesForResolution as NodeServicesForResolution,
|
||||||
|
database,
|
||||||
|
schemaService,
|
||||||
|
cordappLoader.appClassLoader
|
||||||
|
).apply { start() }
|
||||||
}
|
}
|
||||||
|
|
||||||
// This needs to be internal as MutableClassToInstanceMap is a guava type and shouldn't be part of our public API
|
// This needs to be internal as MutableClassToInstanceMap is a guava type and shouldn't be part of our public API
|
||||||
|
Reference in New Issue
Block a user