diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt index 9a2b2c3b64..4884c21832 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt @@ -43,7 +43,10 @@ enum class TransactionIsolationLevel { } private val _contextDatabase = ThreadLocal() -val contextDatabase get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}") +var contextDatabase: CordaPersistence + get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}") + set(database) = _contextDatabase.set(database) +val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get() class CordaPersistence( val dataSource: DataSource, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index deac23fa84..58788db721 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -31,6 +31,9 @@ import net.corda.nodeapi.RPCApi import net.corda.nodeapi.externalTrace import net.corda.nodeapi.impersonatedActor import net.corda.nodeapi.internal.DeduplicationChecker +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.contextDatabase +import net.corda.nodeapi.internal.persistence.contextDatabaseOrNull import org.apache.activemq.artemis.api.core.Message import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* @@ -57,16 +60,13 @@ data class RPCServerConfiguration( /** The interval of subscription reaping */ val reapInterval: Duration, /** The cache expiry of a deduplication watermark per client. */ - val deduplicationCacheExpiry: Duration, - /** The size of the send queue */ - val sendJobQueueSize: Int + val deduplicationCacheExpiry: Duration ) { companion object { val default = RPCServerConfiguration( rpcThreadPoolSize = 4, reapInterval = 1.seconds, - deduplicationCacheExpiry = 1.days, - sendJobQueueSize = 256 + deduplicationCacheExpiry = 1.days ) } } @@ -129,7 +129,7 @@ class RPCServer( private var serverControl: ActiveMQServerControl? = null private val responseMessageBuffer = ConcurrentHashMap() - private val sendJobQueue = ArrayBlockingQueue(rpcConfiguration.sendJobQueueSize) + private val sendJobQueue = LinkedBlockingQueue() private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry) private var deduplicationIdentity: String? = null @@ -222,6 +222,9 @@ class RPCServer( private fun handleSendJob(sequenceNumber: Long, job: RpcSendJob.Send) { try { val artemisMessage = producerSession!!.createMessage(false) + if (job.database != null) { + contextDatabase = job.database + } // We must do the serialisation here as any encountered Observables may already have events, which would // trigger more sends. We must make sure that the root of the Observables (e.g. the RPC reply) is sent // before any child observations. @@ -418,12 +421,15 @@ class RPCServer( private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this) fun sendMessage(serverToClient: RPCApi.ServerToClient) { - sendJobQueue.put(RpcSendJob.Send(clientAddress, serializationContextWithObservableContext, serverToClient)) + sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress, serializationContextWithObservableContext, serverToClient)) } } private sealed class RpcSendJob { data class Send( + // TODO HACK this is because during serialisation we subscribe to observables that may use + // DatabaseTransactionWrappingSubscriber which tries to access the current database, + val database: CordaPersistence?, val clientAddress: SimpleString, val serializationContext: SerializationContext, val message: RPCApi.ServerToClient