mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
* ENT-4237: Added timestamp to the node_transactions table. * ENT-4237: Clock for timestamp now retrieved from ServiceHub. And now record verification time as well. * ENT-4237: Fixed tests. Also enabled stream output in allParallelIntegrationTest. * ENT-4237: Changed timestamp to a val. * ENT-4237: Changed streamOutput to false for allParallelIntegrationTest * ENT-4237: Unit tests added for new timestamp column. Also now passing a clock into DBTransactionStorage. * ENT-4237: Added more unit tests to check timestamp * ENT-4237: Fix test to actually change clock time when testing transaction time does not change.
This commit is contained in:
parent
8f7346f84c
commit
4a35b99283
@ -846,7 +846,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
protected open fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage {
|
||||
return DBTransactionStorage(database, cacheFactory)
|
||||
return DBTransactionStorage(database, cacheFactory, platformClock)
|
||||
}
|
||||
|
||||
protected open fun makeNetworkParametersStorage(): NetworkParametersStorage {
|
||||
|
@ -8,6 +8,7 @@ import liquibase.exception.ValidationErrors
|
||||
import liquibase.resource.ResourceAccessor
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.node.SimpleClock
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.persistence.*
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -16,6 +17,7 @@ import net.corda.nodeapi.internal.persistence.SchemaMigration.Companion.NODE_X50
|
||||
import java.io.PrintWriter
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLFeatureNotSupportedException
|
||||
import java.time.Clock
|
||||
import java.util.logging.Logger
|
||||
import javax.sql.DataSource
|
||||
|
||||
@ -62,7 +64,7 @@ abstract class CordaMigration : CustomTaskChange {
|
||||
|
||||
cordaDB.transaction {
|
||||
identityService.ourNames = setOf(ourName)
|
||||
val dbTransactions = DBTransactionStorage(cordaDB, cacheFactory)
|
||||
val dbTransactions = DBTransactionStorage(cordaDB, cacheFactory, SimpleClock(Clock.systemUTC()))
|
||||
val attachmentsService = NodeAttachmentService(metricRegistry, cacheFactory, cordaDB)
|
||||
_servicesForResolution = MigrationServicesForResolution(identityService, attachmentsService, dbTransactions, cordaDB, cacheFactory)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.CordaClock
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMapBase
|
||||
@ -24,11 +25,13 @@ import net.corda.nodeapi.internal.persistence.*
|
||||
import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.persistence.*
|
||||
import kotlin.streams.toList
|
||||
|
||||
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory) : WritableTransactionStorage, SingletonSerializeAsToken() {
|
||||
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory,
|
||||
private val clock: CordaClock) : WritableTransactionStorage, SingletonSerializeAsToken() {
|
||||
|
||||
@Suppress("MagicNumber") // database column width
|
||||
@Entity
|
||||
@ -47,8 +50,11 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
|
||||
@Column(name = "status", nullable = false, length = 1)
|
||||
@Convert(converter = TransactionStatusConverter::class)
|
||||
val status: TransactionStatus
|
||||
)
|
||||
val status: TransactionStatus,
|
||||
|
||||
@Column(name = "timestamp", nullable = false)
|
||||
val timestamp: Instant
|
||||
)
|
||||
|
||||
enum class TransactionStatus {
|
||||
UNVERIFIED,
|
||||
@ -105,7 +111,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
}
|
||||
}
|
||||
|
||||
fun createTransactionsMap(cacheFactory: NamedCacheFactory)
|
||||
fun createTransactionsMap(cacheFactory: NamedCacheFactory, clock: CordaClock)
|
||||
: AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> {
|
||||
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
|
||||
cacheFactory = cacheFactory,
|
||||
@ -121,7 +127,8 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
txId = key.toString(),
|
||||
stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString(),
|
||||
transaction = value.toSignedTx().serialize(context = contextToUse().withEncoding(SNAPPY)).bytes,
|
||||
status = value.status
|
||||
status = value.status,
|
||||
timestamp = clock.instant()
|
||||
)
|
||||
},
|
||||
persistentEntityClass = DBTransaction::class.java,
|
||||
@ -135,7 +142,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
}
|
||||
}
|
||||
|
||||
private val txStorage = ThreadBox(createTransactionsMap(cacheFactory))
|
||||
private val txStorage = ThreadBox(createTransactionsMap(cacheFactory, clock))
|
||||
|
||||
private fun updateTransaction(txId: SecureHash): Boolean {
|
||||
val session = currentDBSession()
|
||||
@ -147,6 +154,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()),
|
||||
criteriaBuilder.equal(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.UNVERIFIED)
|
||||
))
|
||||
criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant())
|
||||
val update = session.createQuery(criteriaUpdate)
|
||||
val rowsUpdated = update.executeUpdate()
|
||||
return rowsUpdated != 0
|
||||
|
@ -24,6 +24,7 @@
|
||||
<include file="migration/node-core.changelog-v13.xml"/>
|
||||
<!-- This change should be done before the v14-data migration. -->
|
||||
<include file="migration/node-core.changelog-v15.xml"/>
|
||||
<include file="migration/node-core.changelog-v16.xml"/>
|
||||
|
||||
<!-- This must run after node-core.changelog-init.xml, to prevent database columns being created twice. -->
|
||||
<include file="migration/vault-schema.changelog-v9.xml"/>
|
||||
|
@ -0,0 +1,14 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
|
||||
logicalFilePath="migration/node-services.changelog-init.xml">
|
||||
|
||||
<changeSet author="R3.Corda" id="add_timestamp_column_to_node_transactions">
|
||||
<addColumn tableName="node_transactions">
|
||||
<column name="timestamp" type="TIMESTAMP" defaultValueDate="2000-01-01 12:00:00">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
</addColumn>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -50,6 +50,7 @@ import org.mockito.Mockito
|
||||
import java.security.KeyPair
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
@ -208,7 +209,8 @@ class VaultStateMigrationTest {
|
||||
txId = tx.id.toString(),
|
||||
stateMachineRunId = null,
|
||||
transaction = tx.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes,
|
||||
status = DBTransactionStorage.TransactionStatus.VERIFIED
|
||||
status = DBTransactionStorage.TransactionStatus.VERIFIED,
|
||||
timestamp = Instant.now()
|
||||
)
|
||||
session.save(persistentTx)
|
||||
}
|
||||
|
@ -9,6 +9,9 @@ import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.CordaClock
|
||||
import net.corda.node.MutableClock
|
||||
import net.corda.node.SimpleClock
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.*
|
||||
@ -22,6 +25,8 @@ import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
@ -51,6 +56,116 @@ class DBTransactionStorageTests {
|
||||
LogHelper.reset(PersistentUniquenessProvider::class)
|
||||
}
|
||||
|
||||
private class TransactionClock(var timeNow: Instant,
|
||||
override var delegateClock: Clock = systemUTC()) : MutableClock(delegateClock) {
|
||||
override fun instant(): Instant = timeNow
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create verified transaction and validate timestamp in db`() {
|
||||
val now = Instant.ofEpochSecond(111222333L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction()
|
||||
transactionStorage.addTransaction(transaction)
|
||||
assertEquals(now, readTransactionTimestampFromDB(transaction.id))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create unverified transaction and validate timestamp in db`() {
|
||||
val now = Instant.ofEpochSecond(333444555L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction()
|
||||
transactionStorage.addUnverifiedTransaction(transaction)
|
||||
assertEquals(now, readTransactionTimestampFromDB(transaction.id))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create unverified then verified transaction and validate timestamps in db`() {
|
||||
val unverifiedTime = Instant.ofEpochSecond(555666777L)
|
||||
val verifiedTime = Instant.ofEpochSecond(888999111L)
|
||||
val transactionClock = TransactionClock(unverifiedTime)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction()
|
||||
transactionStorage.addUnverifiedTransaction(transaction)
|
||||
assertEquals(unverifiedTime, readTransactionTimestampFromDB(transaction.id))
|
||||
transactionClock.timeNow = verifiedTime
|
||||
transactionStorage.addTransaction(transaction)
|
||||
assertEquals(verifiedTime, readTransactionTimestampFromDB(transaction.id))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check timestamp does not change when attempting to move transaction from verified to unverified`() {
|
||||
|
||||
val verifiedTime = Instant.ofEpochSecond(555666222L)
|
||||
val differentTime = Instant.ofEpochSecond(888777666L)
|
||||
val transactionClock = TransactionClock(verifiedTime)
|
||||
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction()
|
||||
database.transaction {
|
||||
transactionStorage.addTransaction(transaction)
|
||||
}
|
||||
assertEquals(verifiedTime, readTransactionTimestampFromDB(transaction.id))
|
||||
transactionClock.timeNow = differentTime
|
||||
database.transaction {
|
||||
transactionStorage.addUnverifiedTransaction(transaction)
|
||||
}
|
||||
assertTransactionIsRetrievable(transaction)
|
||||
assertEquals(verifiedTime, readTransactionTimestampFromDB(transaction.id))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check timestamp does not change when transaction saved twice in same DB transaction scope`() {
|
||||
val verifiedTime = Instant.ofEpochSecond(3333666222L)
|
||||
val differentTime = Instant.ofEpochSecond(111777666L)
|
||||
val transactionClock = TransactionClock(verifiedTime)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val firstTransaction = newTransaction()
|
||||
database.transaction {
|
||||
transactionStorage.addTransaction(firstTransaction)
|
||||
transactionClock.timeNow = differentTime
|
||||
transactionStorage.addTransaction(firstTransaction)
|
||||
}
|
||||
assertTransactionIsRetrievable(firstTransaction)
|
||||
assertThat(transactionStorage.transactions).containsOnly(firstTransaction)
|
||||
assertEquals(verifiedTime, readTransactionTimestampFromDB(firstTransaction.id))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check timestamp does not change when transaction saved twice in two DB transaction scopes`() {
|
||||
val verifiedTime = Instant.ofEpochSecond(11119999222L)
|
||||
val differentTime = Instant.ofEpochSecond(666333222L)
|
||||
val transactionClock = TransactionClock(verifiedTime)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val firstTransaction = newTransaction()
|
||||
val secondTransaction = newTransaction()
|
||||
|
||||
transactionStorage.addTransaction(firstTransaction)
|
||||
assertEquals(verifiedTime, readTransactionTimestampFromDB(firstTransaction.id))
|
||||
transactionClock.timeNow = differentTime
|
||||
|
||||
database.transaction {
|
||||
transactionStorage.addTransaction(secondTransaction)
|
||||
transactionStorage.addTransaction(firstTransaction)
|
||||
}
|
||||
assertTransactionIsRetrievable(firstTransaction)
|
||||
assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction)
|
||||
assertEquals(verifiedTime, readTransactionTimestampFromDB(firstTransaction.id))
|
||||
}
|
||||
|
||||
private fun readTransactionTimestampFromDB(id: SecureHash): Instant {
|
||||
val fromDb = database.transaction {
|
||||
session.createQuery(
|
||||
"from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId",
|
||||
DBTransactionStorage.DBTransaction::class.java
|
||||
).setParameter("transactionId", id.toString()).resultList.map { it }
|
||||
}
|
||||
assertEquals(1, fromDb.size)
|
||||
return fromDb[0].timestamp
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `empty store`() {
|
||||
assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull()
|
||||
@ -198,9 +313,9 @@ class DBTransactionStorageTests {
|
||||
assertTransactionIsRetrievable(secondTransaction)
|
||||
}
|
||||
|
||||
private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null) {
|
||||
private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) {
|
||||
transactionStorage = DBTransactionStorage(database, TestingNamedCacheFactory(cacheSizeBytesOverride
|
||||
?: 1024))
|
||||
?: 1024), clock)
|
||||
}
|
||||
|
||||
private fun assertTransactionIsRetrievable(transaction: SignedTransaction) {
|
||||
|
@ -34,6 +34,7 @@ import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.sql.SQLException
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.test.assertEquals
|
||||
@ -284,7 +285,8 @@ class RetryFlowMockTest {
|
||||
}
|
||||
|
||||
private fun doInsert() {
|
||||
val tx = DBTransactionStorage.DBTransaction("Foo", null, Utils.EMPTY_BYTES, DBTransactionStorage.TransactionStatus.VERIFIED)
|
||||
val tx = DBTransactionStorage.DBTransaction("Foo", null, Utils.EMPTY_BYTES,
|
||||
DBTransactionStorage.TransactionStatus.VERIFIED, Instant.now())
|
||||
contextTransaction.session.save(tx)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user