diff --git a/core-tests/src/test/kotlin/net/corda/coretests/transactions/AttachmentsClassLoaderTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/transactions/AttachmentsClassLoaderTests.kt index 986b6052ef..e0f3778418 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/transactions/AttachmentsClassLoaderTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/transactions/AttachmentsClassLoaderTests.kt @@ -24,8 +24,9 @@ import net.corda.core.node.NetworkParameters import net.corda.core.node.services.AttachmentId import net.corda.core.serialization.internal.AttachmentsClassLoader import net.corda.core.serialization.internal.AttachmentsClassLoaderCacheImpl -import net.corda.testing.common.internal.testNetworkParameters +import net.corda.core.transactions.LedgerTransaction import net.corda.node.services.attachments.NodeAttachmentTrustCalculator +import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.contracts.DummyContract import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME @@ -74,7 +75,7 @@ class AttachmentsClassLoaderTests { val BOB = TestIdentity(BOB_NAME, 80).party val dummyNotary = TestIdentity(DUMMY_NOTARY_NAME, 20) val DUMMY_NOTARY get() = dummyNotary.party - val PROGRAM_ID: String = "net.corda.testing.contracts.MyDummyContract" + const val PROGRAM_ID = "net.corda.testing.contracts.MyDummyContract" } @Rule @@ -89,7 +90,7 @@ class AttachmentsClassLoaderTests { private lateinit var internalStorage: InternalMockAttachmentStorage private lateinit var attachmentTrustCalculator: AttachmentTrustCalculator private val networkParameters = testNetworkParameters() - private val cacheFactory = TestingNamedCacheFactory() + private val cacheFactory = TestingNamedCacheFactory(1) private fun createClassloader( attachment: AttachmentId, @@ -541,6 +542,50 @@ class AttachmentsClassLoaderTests { } } + @Test(timeout=300_000) + fun `class loader not closed after cache starts evicting`() { + tempFolder.root.toPath().let { path -> + val transactions = mutableListOf() + val iterations = 10 + + val baseOutState = TransactionState(DummyContract.SingleOwnerState(0, ALICE), PROGRAM_ID, DUMMY_NOTARY, constraint = AlwaysAcceptAttachmentConstraint) + val inputs = emptyList>() + val outputs = listOf(baseOutState, baseOutState.copy(notary = ALICE), baseOutState.copy(notary = BOB)) + val commands = emptyList>() + val content = createContractString(PROGRAM_ID) + val timeWindow: TimeWindow? = null + val attachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(cacheFactory) + val contractJarPath = ContractJarTestUtils.makeTestContractJar(path, PROGRAM_ID, content = content) + val attachments = createAttachments(contractJarPath) + + for(i in 1 .. iterations) { + val id = SecureHash.randomSHA256() + val privacySalt = PrivacySalt() + val transaction = createLedgerTransaction( + inputs, + outputs, + commands, + attachments, + id, + null, + timeWindow, + privacySalt, + testNetworkParameters(), + emptyList(), + isAttachmentTrusted = { true }, + attachmentsClassLoaderCache = attachmentsClassLoaderCache + ) + transactions.add(transaction) + System.gc() + Thread.sleep(1) + } + + transactions.forEach { + it.verify() + } + } + } + private fun createContractString(contractName: String, versionSeed: Int = 0): String { val pkgs = contractName.split(".") val className = pkgs.last() @@ -563,7 +608,7 @@ class AttachmentsClassLoaderTests { } """.trimIndent() - System.out.println(output) + println(output) return output } @@ -571,6 +616,7 @@ class AttachmentsClassLoaderTests { val attachment = object : AbstractAttachment({contractJarPath.inputStream().readBytes()}, uploader = "app") { @Suppress("OverridingDeprecatedMember") + @Deprecated("Use signerKeys. There is no requirement that attachment signers are Corda parties.") override val signers: List = emptyList() override val signerKeys: List = emptyList() override val size: Int = 1234 @@ -581,6 +627,7 @@ class AttachmentsClassLoaderTests { return listOf( object : AbstractAttachment({ISOLATED_CONTRACTS_JAR_PATH.openStream().readBytes()}, uploader = "app") { @Suppress("OverridingDeprecatedMember") + @Deprecated("Use signerKeys. There is no requirement that attachment signers are Corda parties.") override val signers: List = emptyList() override val signerKeys: List = emptyList() override val size: Int = 1234 @@ -589,6 +636,7 @@ class AttachmentsClassLoaderTests { object : AbstractAttachment({fakeAttachment("importantDoc.pdf", "I am a pdf!").inputStream().readBytes() }, uploader = "app") { @Suppress("OverridingDeprecatedMember") + @Deprecated("Use signerKeys. There is no requirement that attachment signers are Corda parties.") override val signers: List = emptyList() override val signerKeys: List = emptyList() override val size: Int = 1234 diff --git a/core/src/main/kotlin/net/corda/core/internal/TransactionVerifierServiceInternal.kt b/core/src/main/kotlin/net/corda/core/internal/TransactionVerifierServiceInternal.kt index 58d647af6f..0171d71e91 100644 --- a/core/src/main/kotlin/net/corda/core/internal/TransactionVerifierServiceInternal.kt +++ b/core/src/main/kotlin/net/corda/core/internal/TransactionVerifierServiceInternal.kt @@ -16,9 +16,9 @@ import net.corda.core.contracts.TransactionState import net.corda.core.contracts.TransactionVerificationException import net.corda.core.contracts.TransactionVerificationException.ConflictingAttachmentsRejection import net.corda.core.contracts.TransactionVerificationException.ConstraintPropagationRejection +import net.corda.core.contracts.TransactionVerificationException.ContractConstraintRejection import net.corda.core.contracts.TransactionVerificationException.ContractCreationError import net.corda.core.contracts.TransactionVerificationException.ContractRejection -import net.corda.core.contracts.TransactionVerificationException.ContractConstraintRejection import net.corda.core.contracts.TransactionVerificationException.Direction import net.corda.core.contracts.TransactionVerificationException.DuplicateAttachmentsRejection import net.corda.core.contracts.TransactionVerificationException.InvalidConstraintRejection diff --git a/core/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsClassLoader.kt b/core/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsClassLoader.kt index 6c8279a1e8..83de5fe059 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsClassLoader.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsClassLoader.kt @@ -35,6 +35,7 @@ import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import java.io.IOException import java.io.InputStream +import java.lang.ref.ReferenceQueue import java.lang.ref.WeakReference import java.net.URL import java.net.URLClassLoader @@ -46,6 +47,8 @@ import java.security.Permission import java.util.Locale import java.util.ServiceLoader import java.util.WeakHashMap +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong import java.util.function.Function /** @@ -333,6 +336,7 @@ class AttachmentsClassLoader(attachments: List, @VisibleForTesting object AttachmentsClassLoaderBuilder { private const val CACHE_SIZE = 16 + private const val STRONG_REFERENCE_TO_CACHED_SERIALIZATION_CONTEXT = "cachedSerializationContext" private val fallBackCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderSimpleCacheImpl(CACHE_SIZE) @@ -352,16 +356,15 @@ object AttachmentsClassLoaderBuilder { val attachmentIds = attachments.mapTo(LinkedHashSet(), Attachment::id) val cache = attachmentsClassLoaderCache ?: fallBackCache - val serializationContext = cache.computeIfAbsent(AttachmentsClassLoaderKey(attachmentIds, params), Function { key -> + val cachedSerializationContext = cache.computeIfAbsent(AttachmentsClassLoaderKey(attachmentIds, params), Function { key -> // Create classloader and load serializers, whitelisted classes val transactionClassLoader = AttachmentsClassLoader(attachments, key.params, txId, isAttachmentTrusted, parent) val serializers = try { createInstancesOfClassesImplementing(transactionClassLoader, SerializationCustomSerializer::class.java, JDK1_2_CLASS_FILE_FORMAT_MAJOR_VERSION..JDK8_CLASS_FILE_FORMAT_MAJOR_VERSION) - } - catch(ex: UnsupportedClassVersionError) { - throw TransactionVerificationException.UnsupportedClassVersionError(txId, ex.message!!, ex) - } + } catch (ex: UnsupportedClassVersionError) { + throw TransactionVerificationException.UnsupportedClassVersionError(txId, ex.message!!, ex) + } val whitelistedClasses = ServiceLoader.load(SerializationWhitelist::class.java, transactionClassLoader) .flatMap(SerializationWhitelist::whitelist) @@ -375,11 +378,17 @@ object AttachmentsClassLoaderBuilder { .withWhitelist(whitelistedClasses) .withCustomSerializers(serializers) .withoutCarpenter() - }).withProperties(mapOf( - // Duplicate the SerializationContext from the cache and give - // it these extra properties, just for this transaction. - AMQP_ENVELOPE_CACHE_PROPERTY to HashMap(AMQP_ENVELOPE_CACHE_INITIAL_CAPACITY), - DESERIALIZATION_CACHE_PROPERTY to HashMap() + }) + + val serializationContext = cachedSerializationContext.withProperties(mapOf( + // Duplicate the SerializationContext from the cache and give + // it these extra properties, just for this transaction. + // However, keep a strong reference to the cached SerializationContext so we can + // leverage the power of WeakReferences in the AttachmentsClassLoaderCacheImpl to figure + // out when all these have gone out of scope by the BasicVerifier going out of scope. + AMQP_ENVELOPE_CACHE_PROPERTY to HashMap(AMQP_ENVELOPE_CACHE_INITIAL_CAPACITY), + DESERIALIZATION_CACHE_PROPERTY to HashMap(), + STRONG_REFERENCE_TO_CACHED_SERIALIZATION_CONTEXT to cachedSerializationContext )) // Deserialize all relevant classes in the transaction classloader. @@ -396,6 +405,8 @@ object AttachmentsClassLoaderBuilder { object AttachmentURLStreamHandlerFactory : URLStreamHandlerFactory { internal const val attachmentScheme = "attachment" + private val uniqueness = AtomicLong(0) + private val loadedAttachments: AttachmentsHolder = AttachmentsHolderImpl() override fun createURLStreamHandler(protocol: String): URLStreamHandler? { @@ -406,14 +417,9 @@ object AttachmentURLStreamHandlerFactory : URLStreamHandlerFactory { @Synchronized fun toUrl(attachment: Attachment): URL { - val proposedURL = URL(attachmentScheme, "", -1, attachment.id.toString(), AttachmentURLStreamHandler) - val existingURL = loadedAttachments.getKey(proposedURL) - return if (existingURL == null) { - loadedAttachments[proposedURL] = attachment - proposedURL - } else { - existingURL - } + val uniqueURL = URL(attachmentScheme, "", -1, attachment.id.toString()+ "?" + uniqueness.getAndIncrement(), AttachmentURLStreamHandler) + loadedAttachments[uniqueURL] = attachment + return uniqueURL } @VisibleForTesting @@ -471,20 +477,52 @@ interface AttachmentsClassLoaderCache { @DeleteForDJVM class AttachmentsClassLoaderCacheImpl(cacheFactory: NamedCacheFactory) : SingletonSerializeAsToken(), AttachmentsClassLoaderCache { - private val cache: Cache = cacheFactory.buildNamed( - // Close deserialization classloaders when we evict them - // to release any resources they may be holding. - @Suppress("TooGenericExceptionCaught") - Caffeine.newBuilder().removalListener { key, context, _ -> - try { - (context?.deserializationClassLoader as? AutoCloseable)?.close() - } catch (e: Exception) { - loggerFor().warn("Error destroying serialization context for $key", e) + private class ToBeClosed( + serializationContext: SerializationContext, + val classLoaderToClose: AutoCloseable, + val cacheKey: AttachmentsClassLoaderKey, + queue: ReferenceQueue + ) : WeakReference(serializationContext, queue) + + private val logger = loggerFor() + private val toBeClosed = ConcurrentHashMap.newKeySet() + private val expiryQueue = ReferenceQueue() + + @Suppress("TooGenericExceptionCaught") + private fun purgeExpiryQueue() { + // Close the AttachmentsClassLoader for every SerializationContext + // that has already been garbage-collected. + while (true) { + val head = expiryQueue.poll() as? ToBeClosed ?: break + if (!toBeClosed.remove(head)) { + logger.warn("Reaped unexpected serialization context for {}", head.cacheKey) } - }, "AttachmentsClassLoader_cache" + + try { + head.classLoaderToClose.close() + } catch (e: Exception) { + logger.warn("Error destroying serialization context for ${head.cacheKey}", e) + } + } + } + + private val cache: Cache = cacheFactory.buildNamed( + // Schedule for closing the deserialization classloaders when we evict them + // to release any resources they may be holding. + Caffeine.newBuilder().removalListener { key, context, _ -> + (context?.deserializationClassLoader as? AutoCloseable)?.also { autoCloseable -> + // ClassLoader to be closed once the BasicVerifier, which has a strong + // reference chain to this SerializationContext, has gone out of scope. + toBeClosed += ToBeClosed(context, autoCloseable, key!!, expiryQueue) + } + + // Reap any entries which have been garbage-collected. + purgeExpiryQueue() + }, "AttachmentsClassLoader_cache" ) override fun computeIfAbsent(key: AttachmentsClassLoaderKey, mappingFunction: Function): SerializationContext { + purgeExpiryQueue() return cache.get(key, mappingFunction) ?: throw NullPointerException("null returned from cache mapping function") } }