ENT-8794 Delay closing of attachment class loaders (#7267)

* Delay closing of attachment class loaders until all SerializationContext that refer to them (from BasicVerifier) have gone out of scope.

More comments

* Avoid any concurrency issues with queue processing

* Better concurrency behaviour

* Stop re-using attachment URLs as it turns out we can't close the URLClassLoaders when URLs are shared

* Refactor to use a ReferenceQueue.

Co-authored-by: Chris Rankin <chris.rankin@r3.com>
This commit is contained in:
Rick Parker 2022-11-15 09:25:54 +00:00 committed by GitHub
parent ea9f29e0bd
commit 43168387b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 33 deletions

View File

@ -24,8 +24,9 @@ import net.corda.core.node.NetworkParameters
import net.corda.core.node.services.AttachmentId
import net.corda.core.serialization.internal.AttachmentsClassLoader
import net.corda.core.serialization.internal.AttachmentsClassLoaderCacheImpl
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.core.transactions.LedgerTransaction
import net.corda.node.services.attachments.NodeAttachmentTrustCalculator
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
@ -74,7 +75,7 @@ class AttachmentsClassLoaderTests {
val BOB = TestIdentity(BOB_NAME, 80).party
val dummyNotary = TestIdentity(DUMMY_NOTARY_NAME, 20)
val DUMMY_NOTARY get() = dummyNotary.party
val PROGRAM_ID: String = "net.corda.testing.contracts.MyDummyContract"
const val PROGRAM_ID = "net.corda.testing.contracts.MyDummyContract"
}
@Rule
@ -89,7 +90,7 @@ class AttachmentsClassLoaderTests {
private lateinit var internalStorage: InternalMockAttachmentStorage
private lateinit var attachmentTrustCalculator: AttachmentTrustCalculator
private val networkParameters = testNetworkParameters()
private val cacheFactory = TestingNamedCacheFactory()
private val cacheFactory = TestingNamedCacheFactory(1)
private fun createClassloader(
attachment: AttachmentId,
@ -541,6 +542,50 @@ class AttachmentsClassLoaderTests {
}
}
@Test(timeout=300_000)
fun `class loader not closed after cache starts evicting`() {
tempFolder.root.toPath().let { path ->
val transactions = mutableListOf<LedgerTransaction>()
val iterations = 10
val baseOutState = TransactionState(DummyContract.SingleOwnerState(0, ALICE), PROGRAM_ID, DUMMY_NOTARY, constraint = AlwaysAcceptAttachmentConstraint)
val inputs = emptyList<StateAndRef<*>>()
val outputs = listOf(baseOutState, baseOutState.copy(notary = ALICE), baseOutState.copy(notary = BOB))
val commands = emptyList<CommandWithParties<CommandData>>()
val content = createContractString(PROGRAM_ID)
val timeWindow: TimeWindow? = null
val attachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(cacheFactory)
val contractJarPath = ContractJarTestUtils.makeTestContractJar(path, PROGRAM_ID, content = content)
val attachments = createAttachments(contractJarPath)
for(i in 1 .. iterations) {
val id = SecureHash.randomSHA256()
val privacySalt = PrivacySalt()
val transaction = createLedgerTransaction(
inputs,
outputs,
commands,
attachments,
id,
null,
timeWindow,
privacySalt,
testNetworkParameters(),
emptyList(),
isAttachmentTrusted = { true },
attachmentsClassLoaderCache = attachmentsClassLoaderCache
)
transactions.add(transaction)
System.gc()
Thread.sleep(1)
}
transactions.forEach {
it.verify()
}
}
}
private fun createContractString(contractName: String, versionSeed: Int = 0): String {
val pkgs = contractName.split(".")
val className = pkgs.last()
@ -563,7 +608,7 @@ class AttachmentsClassLoaderTests {
}
""".trimIndent()
System.out.println(output)
println(output)
return output
}
@ -571,6 +616,7 @@ class AttachmentsClassLoaderTests {
val attachment = object : AbstractAttachment({contractJarPath.inputStream().readBytes()}, uploader = "app") {
@Suppress("OverridingDeprecatedMember")
@Deprecated("Use signerKeys. There is no requirement that attachment signers are Corda parties.")
override val signers: List<Party> = emptyList()
override val signerKeys: List<PublicKey> = emptyList()
override val size: Int = 1234
@ -581,6 +627,7 @@ class AttachmentsClassLoaderTests {
return listOf(
object : AbstractAttachment({ISOLATED_CONTRACTS_JAR_PATH.openStream().readBytes()}, uploader = "app") {
@Suppress("OverridingDeprecatedMember")
@Deprecated("Use signerKeys. There is no requirement that attachment signers are Corda parties.")
override val signers: List<Party> = emptyList()
override val signerKeys: List<PublicKey> = emptyList()
override val size: Int = 1234
@ -589,6 +636,7 @@ class AttachmentsClassLoaderTests {
object : AbstractAttachment({fakeAttachment("importantDoc.pdf", "I am a pdf!").inputStream().readBytes()
}, uploader = "app") {
@Suppress("OverridingDeprecatedMember")
@Deprecated("Use signerKeys. There is no requirement that attachment signers are Corda parties.")
override val signers: List<Party> = emptyList()
override val signerKeys: List<PublicKey> = emptyList()
override val size: Int = 1234

View File

@ -16,9 +16,9 @@ import net.corda.core.contracts.TransactionState
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.contracts.TransactionVerificationException.ConflictingAttachmentsRejection
import net.corda.core.contracts.TransactionVerificationException.ConstraintPropagationRejection
import net.corda.core.contracts.TransactionVerificationException.ContractConstraintRejection
import net.corda.core.contracts.TransactionVerificationException.ContractCreationError
import net.corda.core.contracts.TransactionVerificationException.ContractRejection
import net.corda.core.contracts.TransactionVerificationException.ContractConstraintRejection
import net.corda.core.contracts.TransactionVerificationException.Direction
import net.corda.core.contracts.TransactionVerificationException.DuplicateAttachmentsRejection
import net.corda.core.contracts.TransactionVerificationException.InvalidConstraintRejection

View File

@ -35,6 +35,7 @@ import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import java.io.IOException
import java.io.InputStream
import java.lang.ref.ReferenceQueue
import java.lang.ref.WeakReference
import java.net.URL
import java.net.URLClassLoader
@ -46,6 +47,8 @@ import java.security.Permission
import java.util.Locale
import java.util.ServiceLoader
import java.util.WeakHashMap
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Function
/**
@ -333,6 +336,7 @@ class AttachmentsClassLoader(attachments: List<Attachment>,
@VisibleForTesting
object AttachmentsClassLoaderBuilder {
private const val CACHE_SIZE = 16
private const val STRONG_REFERENCE_TO_CACHED_SERIALIZATION_CONTEXT = "cachedSerializationContext"
private val fallBackCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderSimpleCacheImpl(CACHE_SIZE)
@ -352,16 +356,15 @@ object AttachmentsClassLoaderBuilder {
val attachmentIds = attachments.mapTo(LinkedHashSet(), Attachment::id)
val cache = attachmentsClassLoaderCache ?: fallBackCache
val serializationContext = cache.computeIfAbsent(AttachmentsClassLoaderKey(attachmentIds, params), Function { key ->
val cachedSerializationContext = cache.computeIfAbsent(AttachmentsClassLoaderKey(attachmentIds, params), Function { key ->
// Create classloader and load serializers, whitelisted classes
val transactionClassLoader = AttachmentsClassLoader(attachments, key.params, txId, isAttachmentTrusted, parent)
val serializers = try {
createInstancesOfClassesImplementing(transactionClassLoader, SerializationCustomSerializer::class.java,
JDK1_2_CLASS_FILE_FORMAT_MAJOR_VERSION..JDK8_CLASS_FILE_FORMAT_MAJOR_VERSION)
}
catch(ex: UnsupportedClassVersionError) {
throw TransactionVerificationException.UnsupportedClassVersionError(txId, ex.message!!, ex)
}
} catch (ex: UnsupportedClassVersionError) {
throw TransactionVerificationException.UnsupportedClassVersionError(txId, ex.message!!, ex)
}
val whitelistedClasses = ServiceLoader.load(SerializationWhitelist::class.java, transactionClassLoader)
.flatMap(SerializationWhitelist::whitelist)
@ -375,11 +378,17 @@ object AttachmentsClassLoaderBuilder {
.withWhitelist(whitelistedClasses)
.withCustomSerializers(serializers)
.withoutCarpenter()
}).withProperties(mapOf<Any, Any>(
// Duplicate the SerializationContext from the cache and give
// it these extra properties, just for this transaction.
AMQP_ENVELOPE_CACHE_PROPERTY to HashMap<Any, Any>(AMQP_ENVELOPE_CACHE_INITIAL_CAPACITY),
DESERIALIZATION_CACHE_PROPERTY to HashMap<Any, Any>()
})
val serializationContext = cachedSerializationContext.withProperties(mapOf<Any, Any>(
// Duplicate the SerializationContext from the cache and give
// it these extra properties, just for this transaction.
// However, keep a strong reference to the cached SerializationContext so we can
// leverage the power of WeakReferences in the AttachmentsClassLoaderCacheImpl to figure
// out when all these have gone out of scope by the BasicVerifier going out of scope.
AMQP_ENVELOPE_CACHE_PROPERTY to HashMap<Any, Any>(AMQP_ENVELOPE_CACHE_INITIAL_CAPACITY),
DESERIALIZATION_CACHE_PROPERTY to HashMap<Any, Any>(),
STRONG_REFERENCE_TO_CACHED_SERIALIZATION_CONTEXT to cachedSerializationContext
))
// Deserialize all relevant classes in the transaction classloader.
@ -396,6 +405,8 @@ object AttachmentsClassLoaderBuilder {
object AttachmentURLStreamHandlerFactory : URLStreamHandlerFactory {
internal const val attachmentScheme = "attachment"
private val uniqueness = AtomicLong(0)
private val loadedAttachments: AttachmentsHolder = AttachmentsHolderImpl()
override fun createURLStreamHandler(protocol: String): URLStreamHandler? {
@ -406,14 +417,9 @@ object AttachmentURLStreamHandlerFactory : URLStreamHandlerFactory {
@Synchronized
fun toUrl(attachment: Attachment): URL {
val proposedURL = URL(attachmentScheme, "", -1, attachment.id.toString(), AttachmentURLStreamHandler)
val existingURL = loadedAttachments.getKey(proposedURL)
return if (existingURL == null) {
loadedAttachments[proposedURL] = attachment
proposedURL
} else {
existingURL
}
val uniqueURL = URL(attachmentScheme, "", -1, attachment.id.toString()+ "?" + uniqueness.getAndIncrement(), AttachmentURLStreamHandler)
loadedAttachments[uniqueURL] = attachment
return uniqueURL
}
@VisibleForTesting
@ -471,20 +477,52 @@ interface AttachmentsClassLoaderCache {
@DeleteForDJVM
class AttachmentsClassLoaderCacheImpl(cacheFactory: NamedCacheFactory) : SingletonSerializeAsToken(), AttachmentsClassLoaderCache {
private val cache: Cache<AttachmentsClassLoaderKey, SerializationContext> = cacheFactory.buildNamed(
// Close deserialization classloaders when we evict them
// to release any resources they may be holding.
@Suppress("TooGenericExceptionCaught")
Caffeine.newBuilder().removalListener { key, context, _ ->
try {
(context?.deserializationClassLoader as? AutoCloseable)?.close()
} catch (e: Exception) {
loggerFor<AttachmentsClassLoaderCacheImpl>().warn("Error destroying serialization context for $key", e)
private class ToBeClosed(
serializationContext: SerializationContext,
val classLoaderToClose: AutoCloseable,
val cacheKey: AttachmentsClassLoaderKey,
queue: ReferenceQueue<SerializationContext>
) : WeakReference<SerializationContext>(serializationContext, queue)
private val logger = loggerFor<AttachmentsClassLoaderCacheImpl>()
private val toBeClosed = ConcurrentHashMap.newKeySet<ToBeClosed>()
private val expiryQueue = ReferenceQueue<SerializationContext>()
@Suppress("TooGenericExceptionCaught")
private fun purgeExpiryQueue() {
// Close the AttachmentsClassLoader for every SerializationContext
// that has already been garbage-collected.
while (true) {
val head = expiryQueue.poll() as? ToBeClosed ?: break
if (!toBeClosed.remove(head)) {
logger.warn("Reaped unexpected serialization context for {}", head.cacheKey)
}
}, "AttachmentsClassLoader_cache"
try {
head.classLoaderToClose.close()
} catch (e: Exception) {
logger.warn("Error destroying serialization context for ${head.cacheKey}", e)
}
}
}
private val cache: Cache<AttachmentsClassLoaderKey, SerializationContext> = cacheFactory.buildNamed(
// Schedule for closing the deserialization classloaders when we evict them
// to release any resources they may be holding.
Caffeine.newBuilder().removalListener { key, context, _ ->
(context?.deserializationClassLoader as? AutoCloseable)?.also { autoCloseable ->
// ClassLoader to be closed once the BasicVerifier, which has a strong
// reference chain to this SerializationContext, has gone out of scope.
toBeClosed += ToBeClosed(context, autoCloseable, key!!, expiryQueue)
}
// Reap any entries which have been garbage-collected.
purgeExpiryQueue()
}, "AttachmentsClassLoader_cache"
)
override fun computeIfAbsent(key: AttachmentsClassLoaderKey, mappingFunction: Function<in AttachmentsClassLoaderKey, out SerializationContext>): SerializationContext {
purgeExpiryQueue()
return cache.get(key, mappingFunction) ?: throw NullPointerException("null returned from cache mapping function")
}
}