ENT-2848 Add caching to contract attachment versions (#4410)

* Refactor into attachment service

Fix up mock service

First caching version, but with no invalidation currently

Set cache size

Fix up after rebase

Cache invalidation

Formatting tidy up

Sort out some nullability

Add kdocs.

Unit tests

More unit tests

Fix TODO

Unit test fixes

Unit test fixes

Fixed concurrent invalidating transaction support.

* Correct some transaction concurrency bug, including unit test.

* Added some unit tests for the method I added to persistence.

* Remove some blank lines

* Review feedback

* Fix imports
This commit is contained in:
Rick Parker 2018-12-17 15:14:14 +00:00 committed by GitHub
parent fe3182d22f
commit 20e5bbf56f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 625 additions and 42 deletions

View File

@ -3,7 +3,8 @@ package net.corda.core.node.services
import net.corda.core.DoNotImplement
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.vault.*
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import java.io.IOException
import java.io.InputStream
import java.nio.file.FileAlreadyExistsException
@ -74,5 +75,15 @@ interface AttachmentStorage {
fun queryAttachments(criteria: AttachmentQueryCriteria): List<AttachmentId> {
return queryAttachments(criteria, null)
}
/**
* Find the Attachment Id of the contract attachment with the highest version for a given contract class name
* from trusted upload sources.
*
* @param contractClassName The fully qualified name of the contract class.
* @param minContractVersion The minimum contract version that should be returned.
* @return the [AttachmentId] of the contract, or null if none meet the criteria.
*/
fun getContractAttachmentWithHighestContractVersion(contractClassName: String, minContractVersion: Int): AttachmentId?
}

View File

@ -16,9 +16,7 @@ import net.corda.core.node.ZoneVersionTooLowException
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.Builder
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationFactory
import net.corda.core.utilities.contextLogger
@ -26,8 +24,10 @@ import net.corda.core.utilities.warnOnce
import java.security.PublicKey
import java.time.Duration
import java.time.Instant
import java.util.*
import kotlin.collections.ArrayList
import java.util.ArrayDeque
import java.util.UUID
import kotlin.collections.component1
import kotlin.collections.component2
/**
* A TransactionBuilder is a transaction class that's mutable (unlike the others which are all immutable). It is
@ -411,13 +411,8 @@ open class TransactionBuilder @JvmOverloads constructor(
require(isReference || constraints.none { it is HashAttachmentConstraint })
val minimumRequiredContractClassVersion = stateRefs?.map { getContractVersion(services.loadContractAttachment(it)) }?.max() ?: DEFAULT_CORDAPP_VERSION
//TODO could be moved as a single method of the attachment service method e.g. getContractAttachmentWithHighestContractVersion(contractClassName, minContractVersion)
val attachmentQueryCriteria = AttachmentQueryCriteria.AttachmentsQueryCriteria(contractClassNamesCondition = Builder.equal(listOf(contractClassName)),
versionCondition = Builder.greaterThanOrEqual(minimumRequiredContractClassVersion),
uploaderCondition = Builder.`in`(TRUSTED_UPLOADERS))
val attachmentSort = AttachmentSort(listOf(AttachmentSort.AttachmentSortColumn(AttachmentSort.AttachmentSortAttribute.VERSION, Sort.Direction.DESC)))
return services.attachments.queryAttachments(attachmentQueryCriteria, attachmentSort).firstOrNull() ?: throw MissingContractAttachments(states, minimumRequiredContractClassVersion)
return services.attachments.getContractAttachmentWithHighestContractVersion(contractClassName, minimumRequiredContractClassVersion)
?: throw MissingContractAttachments(states, minimumRequiredContractClassVersion)
}
private fun useWhitelistedByZoneAttachmentConstraint(contractClassName: ContractClassName, networkParameters: NetworkParameters) = contractClassName in networkParameters.whitelistedContractImplementations.keys

View File

@ -8,7 +8,6 @@ import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party
import net.corda.core.internal.AbstractAttachment
import net.corda.core.internal.DEPLOYED_CORDAPP_UPLOADER
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.cordapp.CordappImpl.Companion.DEFAULT_CORDAPP_VERSION
@ -16,10 +15,6 @@ import net.corda.core.node.ServicesForResolution
import net.corda.core.node.ZoneVersionTooLowException
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.NetworkParametersStorage
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.Builder
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.serialize
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
@ -45,11 +40,6 @@ class TransactionBuilderTest {
private val contractAttachmentId = SecureHash.randomSHA256()
private val attachments = rigorousMock<AttachmentStorage>()
private val networkParametersStorage = rigorousMock<NetworkParametersStorage>()
private val attachmentQueryCriteria = AttachmentQueryCriteria.AttachmentsQueryCriteria(
contractClassNamesCondition = Builder.equal(listOf("net.corda.testing.contracts.DummyContract")),
versionCondition = Builder.greaterThanOrEqual(DEFAULT_CORDAPP_VERSION),
uploaderCondition = Builder.`in`(listOf(DEPLOYED_CORDAPP_UPLOADER, RPC_UPLOADER)))
private val attachmentSort = AttachmentSort(listOf(AttachmentSort.AttachmentSortColumn(AttachmentSort.AttachmentSortAttribute.VERSION, Sort.Direction.DESC)))
@Before
fun setup() {
@ -69,7 +59,8 @@ class TransactionBuilderTest {
doReturn(setOf(DummyContract.PROGRAM_ID)).whenever(attachment).allContracts
doReturn("app").whenever(attachment).uploader
doReturn(emptyList<Party>()).whenever(attachment).signerKeys
doReturn(listOf(contractAttachmentId)).whenever(attachmentStorage).queryAttachments(attachmentQueryCriteria, attachmentSort)
doReturn(contractAttachmentId).whenever(attachmentStorage)
.getContractAttachmentWithHighestContractVersion("net.corda.testing.contracts.DummyContract", DEFAULT_CORDAPP_VERSION)
}
@Test
@ -154,7 +145,8 @@ class TransactionBuilderTest {
doReturn(attachments).whenever(services).attachments
doReturn(signedAttachment).whenever(attachments).openAttachment(contractAttachmentId)
doReturn(listOf(contractAttachmentId)).whenever(attachments).queryAttachments(attachmentQueryCriteria, attachmentSort)
doReturn(contractAttachmentId).whenever(attachments)
.getContractAttachmentWithHighestContractVersion("net.corda.testing.contracts.DummyContract", DEFAULT_CORDAPP_VERSION)
val outputState = TransactionState(data = DummyState(), contract = DummyContract.PROGRAM_ID, notary = notary)
val builder = TransactionBuilder()

View File

@ -10,8 +10,10 @@ import rx.subjects.UnicastSubject
import java.io.Closeable
import java.sql.Connection
import java.sql.SQLException
import java.util.*
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicInteger
import javax.persistence.AttributeConverter
import javax.sql.DataSource
@ -110,9 +112,33 @@ class CordaPersistence(
return contextTransactionOrNull ?: newTransaction(isolation)
}
private val liveTransactions = ConcurrentHashMap<UUID, DatabaseTransaction>()
fun newTransaction(isolation: TransactionIsolationLevel = defaultIsolationLevel): DatabaseTransaction {
val outerTransaction = contextTransactionOrNull
return DatabaseTransaction(isolation.jdbcValue, contextTransactionOrNull, this).also {
contextTransactionOrNull = it
// Outer transaction only exists in a controlled scenario we can ignore.
if (outerTransaction == null) {
liveTransactions.put(it.id, it)
it.onClose { liveTransactions.remove(it.id) }
}
}
}
fun onAllOpenTransactionsClosed(callback: () -> Unit) {
val allOpen = liveTransactions.values.toList()
if (allOpen.isEmpty()) {
callback()
} else {
val counter = AtomicInteger(allOpen.size)
allOpen.forEach {
it.onClose {
if (counter.decrementAndGet() == 0) {
callback()
}
}
}
}
}

View File

@ -6,7 +6,7 @@ import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.PublishSubject
import java.sql.Connection
import java.util.*
import java.util.UUID
import javax.persistence.EntityManager
fun currentDBSession(): Session = contextTransaction.session
@ -71,6 +71,7 @@ class DatabaseTransaction(
internal val boundary = PublishSubject.create<CordaPersistence.Boundary>()
private var committed = false
private var closed = false
fun commit() {
if (sessionDelegate.isInitialized()) {
@ -96,7 +97,10 @@ class DatabaseTransaction(
connection.close()
contextTransactionOrNull = outerTransaction
if (outerTransaction == null) {
boundary.onNext(CordaPersistence.Boundary(id, committed))
synchronized(this) {
closed = true
boundary.onNext(CordaPersistence.Boundary(id, committed))
}
}
}
@ -107,5 +111,10 @@ class DatabaseTransaction(
fun onRollback(callback: () -> Unit) {
boundary.filter { !it.success }.subscribe { callback() }
}
@Synchronized
fun onClose(callback: () -> Unit) {
if (closed) callback() else boundary.subscribe { callback() }
}
}

View File

@ -8,16 +8,10 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.DEPLOYED_CORDAPP_UPLOADER
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.cordapp.CordappImpl.Companion.DEFAULT_CORDAPP_VERSION
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.NetworkParametersStorage
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.Builder
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.LedgerTransaction
@ -96,12 +90,8 @@ class AttachmentsClassLoaderStaticContractTests {
doReturn("app").whenever(attachment).uploader
doReturn(emptyList<Party>()).whenever(attachment).signerKeys
val contractAttachmentId = SecureHash.randomSHA256()
val attachmentQueryCriteria = AttachmentQueryCriteria.AttachmentsQueryCriteria(
contractClassNamesCondition = Builder.equal(listOf(ATTACHMENT_PROGRAM_ID)),
versionCondition = Builder.greaterThanOrEqual(DEFAULT_CORDAPP_VERSION),
uploaderCondition = Builder.`in`(listOf(DEPLOYED_CORDAPP_UPLOADER, RPC_UPLOADER)))
val attachmentSort = AttachmentSort(listOf(AttachmentSort.AttachmentSortColumn(AttachmentSort.AttachmentSortAttribute.VERSION, Sort.Direction.DESC)))
doReturn(listOf(contractAttachmentId)).whenever(attachmentStorage).queryAttachments(attachmentQueryCriteria, attachmentSort)
doReturn(contractAttachmentId).whenever(attachmentStorage)
.getContractAttachmentWithHighestContractVersion(AttachmentDummyContract.ATTACHMENT_PROGRAM_ID, DEFAULT_CORDAPP_VERSION)
}
@Test

View File

@ -0,0 +1,99 @@
package net.corda.nodeapi.internal
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices
import org.junit.After
import org.junit.Test
import java.util.concurrent.Phaser
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class CordaPersistenceTest {
private val database = configureDatabase(MockServices.makeTestDataSourceProperties(),
DatabaseConfig(),
{ null }, { null },
NodeSchemaService(emptySet()))
@After
fun closeDatabase() {
database.close()
}
@Test
fun `onAllOpenTransactionsClosed with zero transactions calls back immediately`() {
val counter = AtomicInteger(0)
database.onAllOpenTransactionsClosed { counter.incrementAndGet() }
assertEquals(1, counter.get())
}
@Test
fun `onAllOpenTransactionsClosed with one transaction calls back after closing`() {
val counter = AtomicInteger(0)
database.transaction {
database.onAllOpenTransactionsClosed { counter.incrementAndGet() }
assertEquals(0, counter.get())
}
assertEquals(1, counter.get())
}
@Test
fun `onAllOpenTransactionsClosed after one transaction has closed calls back immediately`() {
val counter = AtomicInteger(0)
database.transaction {
database.onAllOpenTransactionsClosed { counter.incrementAndGet() }
assertEquals(0, counter.get())
}
assertEquals(1, counter.get())
database.onAllOpenTransactionsClosed { counter.incrementAndGet() }
assertEquals(2, counter.get())
}
@Test
fun `onAllOpenTransactionsClosed with two transactions calls back after closing both`() {
val counter = AtomicInteger(0)
val phaser = openTransactionInOtherThreadAndCloseWhenISay()
// Wait for tx to be started.
phaser.arriveAndAwaitAdvance()
database.transaction {
database.onAllOpenTransactionsClosed { counter.incrementAndGet() }
assertEquals(0, counter.get())
}
assertEquals(0, counter.get())
phaser.arriveAndAwaitAdvance()
phaser.arriveAndAwaitAdvance()
assertEquals(1, counter.get())
}
@Test
fun `onAllOpenTransactionsClosed with two transactions calls back after closing both - instigator closes last`() {
val counter = AtomicInteger(0)
val phaser = openTransactionInOtherThreadAndCloseWhenISay()
// Wait for tx to be started.
phaser.arriveAndAwaitAdvance()
database.transaction {
database.onAllOpenTransactionsClosed { counter.incrementAndGet() }
assertEquals(0, counter.get())
phaser.arriveAndAwaitAdvance()
phaser.arriveAndAwaitAdvance()
assertEquals(0, counter.get())
}
assertEquals(1, counter.get())
}
private fun openTransactionInOtherThreadAndCloseWhenISay(): Phaser {
val phaser = Phaser()
phaser.bulkRegister(2)
thread {
database.transaction {
phaser.arriveAndAwaitAdvance()
phaser.arriveAndAwaitAdvance()
}
// Tell caller we have committed.
phaser.arriveAndAwaitAdvance()
}
return phaser
}
}

View File

@ -13,15 +13,18 @@ import net.corda.core.contracts.ContractClassName
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.internal.*
import net.corda.core.internal.cordapp.CordappImpl.Companion.DEFAULT_CORDAPP_VERSION
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_VERSION
import net.corda.core.internal.cordapp.CordappImpl.Companion.DEFAULT_CORDAPP_VERSION
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.Builder
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.*
import net.corda.core.utilities.contextLogger
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
import net.corda.node.utilities.InfrequentlyMutatedCache
import net.corda.node.utilities.NonInvalidatingCache
import net.corda.node.utilities.NonInvalidatingWeightBasedCache
import net.corda.nodeapi.exceptions.DuplicateAttachmentException
@ -35,7 +38,9 @@ import java.io.InputStream
import java.nio.file.Paths
import java.security.PublicKey
import java.time.Instant
import java.util.*
import java.util.NavigableMap
import java.util.Optional
import java.util.TreeMap
import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
@ -333,6 +338,7 @@ class NodeAttachmentService(
session.save(attachment)
attachmentCount.inc()
log.info("Stored new attachment $id")
contractClassNames.forEach { contractsCache.invalidate(it) }
return@withContractsInJar id
}
if (isUploaderTrusted(uploader)) {
@ -343,6 +349,8 @@ class NodeAttachmentService(
attachment.uploader = uploader
session.saveOrUpdate(attachment)
log.info("Updated attachment $id with uploader $uploader")
contractClassNames.forEach { contractsCache.invalidate(it) }
// TODO: this is racey. ENT-2870
attachmentCache.invalidate(id)
attachmentContentCache.invalidate(id)
}
@ -394,4 +402,33 @@ class NodeAttachmentService(
query.resultList.map { AttachmentId.parse(it.attId) }
}
}
private val contractsCache = InfrequentlyMutatedCache<String, NavigableMap<Int, AttachmentId>>("NodeAttachmentService_contractAttachmentVersions", cacheFactory)
override fun getContractAttachmentWithHighestContractVersion(contractClassName: String, minContractVersion: Int): AttachmentId? {
val versions: NavigableMap<Int, AttachmentId> = contractsCache.get(contractClassName) { name ->
val attachmentQueryCriteria = AttachmentQueryCriteria.AttachmentsQueryCriteria(contractClassNamesCondition = Builder.equal(listOf(name)),
versionCondition = Builder.greaterThanOrEqual(0), uploaderCondition = Builder.`in`(TRUSTED_UPLOADERS))
val attachmentSort = AttachmentSort(listOf(AttachmentSort.AttachmentSortColumn(AttachmentSort.AttachmentSortAttribute.VERSION, Sort.Direction.DESC)))
database.transaction {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBAttachment::class.java)
val root = criteriaQuery.from(DBAttachment::class.java)
val criteriaParser = HibernateAttachmentQueryCriteriaParser(criteriaBuilder, criteriaQuery, root)
// parse criteria and build where predicates
criteriaParser.parse(attachmentQueryCriteria, attachmentSort)
// prepare query for execution
val query = session.createQuery(criteriaQuery)
// execution
TreeMap(query.resultList.map { it.version to AttachmentId.parse(it.attId) }.toMap())
}
}
return versions.tailMap(minContractVersion, true).lastEntry()?.value
}
}

View File

@ -0,0 +1,84 @@
package net.corda.node.utilities
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.internal.NamedCacheFactory
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.util.concurrent.atomic.AtomicInteger
/**
* Wraps a Caffeine cache and provides thread safe and database transaction aware invalidation.
*
* All access should be via [get] and [invalidate]. Data to be mutated should be changed at source (presumed to be a database)
* followed by a call to [invalidate] the value associated with a key. During periods of invalidity, the source will always be
* consulted to resolve transaction visibility issues. This is why invalidation should be infrequent, otherwise the pessimism
* of the cache for invalidated values will result in few cache hits.
*/
class InfrequentlyMutatedCache<K : Any, V : Any>(name: String, cacheFactory: NamedCacheFactory) {
/**
* Retrieve the value associated with the given key in the cache, or use the function to retrieve the value (and potentially cache it).
*
* @param key The key to retrieve.
* @param valueGetter A function to return the value for the key if the cache does not have it.
*/
fun get(key: K, valueGetter: (K) -> V): V {
val wrapper = backingCache.get(key) { key: K ->
Wrapper.Valid(valueGetter(key))
}
return when(wrapper) {
is Wrapper.Valid -> { wrapper.value }
else -> { valueGetter(key) }
}
}
/**
* Inform the cache that the current value for the key may have been updated. Subsequent calls to [get]
* will not use the current cached value. The point at which values start to be cached again will be
* delayed until any open database transaction for the caller has been closed, to avoid callers to [get]
* who do not have transaction visibility of the updated value from re-populating the cache with an incorrect value.
*/
fun invalidate(key: K) {
backingCache.asMap().compute(key) { key: K, value: Wrapper<V>? ->
when(value) {
is Wrapper.Valid -> { invalidate(key, Wrapper.Invalidated()) }
is Wrapper.Invalidated -> { invalidate(key, value) }
else -> { null }
}
}
}
private fun invalidate(key: K, value: Wrapper.Invalidated<V>): Wrapper.Invalidated<V> {
val tx = contextTransactionOrNull
value.invalidators.incrementAndGet()
if (tx != null) {
// When we close, we can't start using caching again until all simultaneously open transactions are closed.
tx.onClose { tx.database.onAllOpenTransactionsClosed { decrementInvalidators(key, value) } }
} else {
decrementInvalidators(key, value)
}
return value
}
private fun decrementInvalidators(key: K, value: Wrapper.Invalidated<V>) {
if(value.invalidators.decrementAndGet() == 0) {
// Maybe we can replace the invalidated value with nothing, so it gets loaded next time.
backingCache.asMap().compute(key) { key: K, currentValue: Wrapper<V>? ->
if(currentValue === value && value.invalidators.get() == 0) {
null
} else currentValue
}
}
}
private val backingCache = cacheFactory.buildNamed<K, Wrapper<V>>(Caffeine.newBuilder(), name)
private sealed class Wrapper<V : Any> {
abstract val value: V?
class Invalidated<V : Any> : Wrapper<V>() {
val invalidators = AtomicInteger(0)
override val value: V? = null
}
class Valid<V : Any>(override val value: V) : Wrapper<V>()
}
}

View File

@ -44,6 +44,7 @@ open class DefaultNamedCacheFactory protected constructor(private val metricRegi
name == "DBTransactionStorage_transactions" -> caffeine.maximumWeight(transactionCacheSizeBytes)
name == "NodeAttachmentService_attachmentContent" -> caffeine.maximumWeight(attachmentContentCacheSizeBytes)
name == "NodeAttachmentService_attachmentPresence" -> caffeine.maximumSize(attachmentCacheBound)
name == "NodeAttachmentService_contractAttachmentVersions" -> caffeine.maximumSize(defaultCacheSize)
name == "PersistentIdentityService_partyByKey" -> caffeine.maximumSize(defaultCacheSize)
name == "PersistentIdentityService_partyByName" -> caffeine.maximumSize(defaultCacheSize)
name == "PersistentNetworkMap_nodesByKey" -> caffeine.maximumSize(defaultCacheSize)

View File

@ -0,0 +1,330 @@
package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices
import org.junit.After
import org.junit.Test
import java.util.concurrent.Phaser
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class InfrequentlyMutatedCacheTest {
private val cache = InfrequentlyMutatedCache<String, Int>("foo", TestingNamedCacheFactory())
private val database = configureDatabase(MockServices.makeTestDataSourceProperties(),
DatabaseConfig(),
{ null }, { null },
NodeSchemaService(emptySet()))
@After
fun closeDatabase() {
database.close()
}
@Test
fun `get from empty cache returns result of loader`() {
database.transaction {
// This will cache "1"
val result = cache.get("foo") {
1
}
assertEquals(1, result)
}
}
@Test
fun `other thread get returns result of local thread loader`() {
database.transaction {
// This will cache "1"
val result = cache.get("foo") {
1
}
assertEquals(1, result)
// Local thread get landed first.
val otherResult = getInOtherThread("foo", 2)
assertEquals(1, otherResult)
}
}
@Test
fun `second get from empty cache returns result of first loader`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
val result = cache.get("foo") {
1
}
assertEquals(2, result)
}
}
@Test
fun `second get from empty cache with invalidate in the middle returns result of second loader`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
cache.invalidate("foo")
val result = cache.get("foo") {
1
}
assertEquals(1, result)
}
}
@Test
fun `other thread get with invalidate in the middle returns result of second loader`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
cache.invalidate("foo")
val result = cache.get("foo") {
1
}
assertEquals(1, result)
// Whilst inside transaction, invalidate prevents caching.
val otherResult = getInOtherThread("foo", 3)
assertEquals(3, otherResult)
}
}
@Test
fun `third get outside first transaction from empty cache with invalidate in the middle returns result of third loader`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
cache.invalidate("foo")
// This should not get cached, as the transaction that invalidated is still in-flight.
val result = cache.get("foo") {
1
}
assertEquals(1, result)
}
database.transaction {
val result = cache.get("foo") {
3
}
assertEquals(3, result)
}
}
@Test
fun `other thread get outside first transaction with invalidate in the middle returns result of other thread`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
cache.invalidate("foo")
// This should not get cached, as the transaction that invalidated is still in-flight.
val result = cache.get("foo") {
1
}
assertEquals(1, result)
}
// Now outside transaction that invalidated, caching can begin again.
val otherResult = getInOtherThread("foo", 3)
assertEquals(3, otherResult)
database.transaction {
val result = cache.get("foo") {
4
}
assertEquals(3, result)
}
}
@Test
fun `fourth get outside first transaction from empty cache with invalidate in the middle returns result of third loader`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
cache.invalidate("foo")
// This should not get cached, as the transaction that invalidated is still in-flight.
val result = cache.get("foo") {
1
}
assertEquals(1, result)
}
// This can now get cached, as the transaction that invalidated is complete.
database.transaction {
val result = cache.get("foo") {
3
}
assertEquals(3, result)
}
database.transaction {
val result = cache.get("foo") {
4
}
assertEquals(3, result)
}
}
@Test
fun `fourth get outside first transaction from empty cache with nested invalidate in the middle returns result of third loader`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
cache.invalidate("foo")
cache.invalidate("foo")
// This should not get cached, as the transaction that invalidated is still in-flight.
val result = cache.get("foo") {
1
}
assertEquals(1, result)
}
// This can now get cached, as the transaction that invalidated is complete.
database.transaction {
val result = cache.get("foo") {
3
}
assertEquals(3, result)
}
database.transaction {
val result = cache.get("foo") {
4
}
assertEquals(3, result)
}
}
@Test
fun `fourth get outside first transaction from empty cache with invalidate in other thread in the middle returns result of second loader`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
invalidateInOtherThread("foo")
// This should not get cached, as the transaction that invalidated it in the other thread has completed but we might
// not see the new value in our transaction since it started first.
val result = cache.get("foo") {
1
}
assertEquals(1, result)
}
// This can now get cached, as the transaction that invalidated is complete.
database.transaction {
val result = cache.get("foo") {
3
}
assertEquals(3, result)
}
}
@Test
fun `transaction started before invalidating thread commits does not cache until after the other thread commits`() {
database.transaction {
// This will cache "2"
cache.get("foo") {
2
}
}
val phaser = invalidateInOtherThreadWhenISay("foo")
// Wait for other thread to start their transaction.
phaser.arriveAndAwaitAdvance()
// Tell other thread to call invalidate
phaser.arriveAndAwaitAdvance()
// Wait for the other thread to call invalidate
phaser.arriveAndAwaitAdvance()
database.transaction {
// This should not get cached, as the transaction that invalidated it in the other thread has completed but we might
// not see the new value in our transaction since it started first.
val result1 = cache.get("foo") {
1
}
assertEquals(1, result1)
val result2 = cache.get("foo") {
3
}
assertEquals(3, result2)
// Now allow other thread to commit transaction
phaser.arriveAndAwaitAdvance()
// and wait for commit to be complete
phaser.arriveAndAwaitAdvance()
// This should get cached, as the transaction that invalidated it in the other thread has completed but we might
// not see the new value in our transaction since it started first.
val result3 = cache.get("foo") {
3
}
assertEquals(3, result3)
val result4 = cache.get("foo") {
4
}
assertEquals(4, result4)
}
// This can now get cached, as the transaction that invalidated is complete.
database.transaction {
val result = cache.get("foo") {
5
}
assertEquals(5, result)
val result2 = cache.get("foo") {
6
}
assertEquals(5, result2)
}
}
private fun getInOtherThread(key: String, loader: Int): Int {
val futureValue = SettableFuture.create<Int>()
thread {
database.transaction {
val result = cache.get(key) {
loader
}
futureValue.set(result)
}
}
return futureValue.get()
}
private fun invalidateInOtherThread(key: String) {
val futureValue = SettableFuture.create<Unit>()
thread {
database.transaction {
cache.invalidate(key)
futureValue.set(Unit)
}
}
return futureValue.get()
}
private fun invalidateInOtherThreadWhenISay(key: String): Phaser {
val phaser = Phaser()
phaser.bulkRegister(2)
thread {
database.transaction {
// Wait for caller and tell them we have started a transaction.
phaser.arriveAndAwaitAdvance()
// Wait for caller to say it's okay to invalidate.
phaser.arriveAndAwaitAdvance()
cache.invalidate(key)
// Tell caller we have invalidated.
phaser.arriveAndAwaitAdvance()
// Wait for caller to allow us to commit transaction.
phaser.arriveAndAwaitAdvance()
}
// Tell caller we have committed.
phaser.arriveAndAwaitAdvance()
}
return phaser
}
}

View File

@ -6,6 +6,7 @@ import net.corda.core.contracts.ContractClassName
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.internal.AbstractAttachment
import net.corda.core.internal.TRUSTED_UPLOADERS
import net.corda.core.internal.UNKNOWN_UPLOADER
import net.corda.core.internal.cordapp.CordappImpl.Companion.DEFAULT_CORDAPP_VERSION
import net.corda.core.internal.readFully
@ -15,11 +16,12 @@ import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.Builder
import net.corda.core.node.services.vault.ColumnPredicate
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.nodeapi.internal.withContractsInJar
import java.io.InputStream
import java.security.PublicKey
import java.util.*
import java.util.HashMap
import java.util.jar.Attributes
import java.util.jar.JarInputStream
@ -114,4 +116,11 @@ class MockAttachmentStorage : AttachmentStorage, SingletonSerializeAsToken() {
}
return sha256
}
override fun getContractAttachmentWithHighestContractVersion(contractClassName: String, minContractVersion: Int): AttachmentId? {
val attachmentQueryCriteria = AttachmentQueryCriteria.AttachmentsQueryCriteria(contractClassNamesCondition = Builder.equal(listOf(contractClassName)),
versionCondition = Builder.greaterThanOrEqual(minContractVersion), uploaderCondition = Builder.`in`(TRUSTED_UPLOADERS))
val attachmentSort = AttachmentSort(listOf(AttachmentSort.AttachmentSortColumn(AttachmentSort.AttachmentSortAttribute.VERSION, Sort.Direction.DESC)))
return queryAttachments(attachmentQueryCriteria, attachmentSort).firstOrNull()
}
}