Hack around database-with-observables issue

This commit is contained in:
Andras Slemmer 2018-02-14 17:30:32 +00:00
parent 81b16776f3
commit 6a4f783106
2 changed files with 17 additions and 8 deletions

View File

@ -43,7 +43,10 @@ enum class TransactionIsolationLevel {
}
private val _contextDatabase = ThreadLocal<CordaPersistence>()
val contextDatabase get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
var contextDatabase: CordaPersistence
get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
set(database) = _contextDatabase.set(database)
val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get()
class CordaPersistence(
val dataSource: DataSource,

View File

@ -31,6 +31,9 @@ import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.externalTrace
import net.corda.nodeapi.impersonatedActor
import net.corda.nodeapi.internal.DeduplicationChecker
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextDatabaseOrNull
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
@ -57,16 +60,13 @@ data class RPCServerConfiguration(
/** The interval of subscription reaping */
val reapInterval: Duration,
/** The cache expiry of a deduplication watermark per client. */
val deduplicationCacheExpiry: Duration,
/** The size of the send queue */
val sendJobQueueSize: Int
val deduplicationCacheExpiry: Duration
) {
companion object {
val default = RPCServerConfiguration(
rpcThreadPoolSize = 4,
reapInterval = 1.seconds,
deduplicationCacheExpiry = 1.days,
sendJobQueueSize = 256
deduplicationCacheExpiry = 1.days
)
}
}
@ -129,7 +129,7 @@ class RPCServer(
private var serverControl: ActiveMQServerControl? = null
private val responseMessageBuffer = ConcurrentHashMap<SimpleString, BufferOrNone>()
private val sendJobQueue = ArrayBlockingQueue<RpcSendJob>(rpcConfiguration.sendJobQueueSize)
private val sendJobQueue = LinkedBlockingQueue<RpcSendJob>()
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
private var deduplicationIdentity: String? = null
@ -222,6 +222,9 @@ class RPCServer(
private fun handleSendJob(sequenceNumber: Long, job: RpcSendJob.Send) {
try {
val artemisMessage = producerSession!!.createMessage(false)
if (job.database != null) {
contextDatabase = job.database
}
// We must do the serialisation here as any encountered Observables may already have events, which would
// trigger more sends. We must make sure that the root of the Observables (e.g. the RPC reply) is sent
// before any child observations.
@ -418,12 +421,15 @@ class RPCServer(
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this)
fun sendMessage(serverToClient: RPCApi.ServerToClient) {
sendJobQueue.put(RpcSendJob.Send(clientAddress, serializationContextWithObservableContext, serverToClient))
sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress, serializationContextWithObservableContext, serverToClient))
}
}
private sealed class RpcSendJob {
data class Send(
// TODO HACK this is because during serialisation we subscribe to observables that may use
// DatabaseTransactionWrappingSubscriber which tries to access the current database,
val database: CordaPersistence?,
val clientAddress: SimpleString,
val serializationContext: SerializationContext,
val message: RPCApi.ServerToClient