ENT-9923 Ledger Recovery: split out recovery metadata into own database schema. (#7364)

This commit is contained in:
Jose Coll 2023-05-24 09:42:09 +01:00 committed by GitHub
parent c7e21b3a65
commit 2e29e36e01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1077 additions and 164 deletions

View File

@ -14,7 +14,6 @@ import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowTransactionMetadata
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryError
@ -24,6 +23,7 @@ import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.ReceiveTransactionFlow
import net.corda.core.flows.SendTransactionFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.TransactionStatus
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
@ -48,6 +48,9 @@ import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.issuedBy
import net.corda.finance.test.flows.CashIssueWithObserversFlow
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorageLedgerRecovery
import net.corda.node.services.persistence.DistributionRecord
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
@ -61,6 +64,7 @@ import net.corda.testing.node.internal.FINANCE_WORKFLOWS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.MOCK_VERSION_INFO
import net.corda.testing.node.internal.MockCryptoService
import net.corda.testing.node.internal.TestCordappInternal
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.cordappWithPackages
@ -345,6 +349,29 @@ class FinalityFlowTests : WithFinality {
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
assertThat(getSenderRecoveryData(stx.id, aliceNode.database)).isNotNull
assertThat(getReceiverRecoveryData(stx.id, bobNode.database)).isNotNull
}
private fun getSenderRecoveryData(id: SecureHash, database: CordaPersistence): DistributionRecord? {
val fromDb = database.transaction {
session.createQuery(
"from ${DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java.name} where tx_id = :transactionId",
DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java
).setParameter("transactionId", id.toString()).resultList.map { it }
}
return fromDb.singleOrNull()?.toSenderDistributionRecord()
}
private fun getReceiverRecoveryData(id: SecureHash, database: CordaPersistence): DistributionRecord? {
val fromDb = database.transaction {
session.createQuery(
"from ${DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java.name} where tx_id = :transactionId",
DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java
).setParameter("transactionId", id.toString()).resultList.map { it }
}
return fromDb.singleOrNull()?.toReceiverDistributionRecord(MockCryptoService(emptyMap()))
}
@StartableByRPC
@ -423,7 +450,7 @@ class FinalityFlowTests : WithFinality {
require(NotarySigCheck.needsNotarySignature(stx))
logger.info("Peer recording transaction without notary signature.")
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
FlowTransactionMetadata(otherSideSession.counterparty.name, StatesToRecord.ONLY_RELEVANT))
TransactionMetadata(otherSideSession.counterparty.name, StatesToRecord.ONLY_RELEVANT))
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck)
logger.info("Peer recorded transaction without notary signature.")

View File

@ -227,7 +227,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
else {
if (newPlatformSessions.isNotEmpty())
finaliseLocallyAndBroadcast(newPlatformSessions, transaction,
FlowTransactionMetadata(
TransactionMetadata(
serviceHub.myInfo.legalIdentities.first().name,
statesToRecord,
sessions.map { it.counterparty.name }.toSet()))
@ -258,7 +258,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
}
@Suspendable
private fun finaliseLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction, metadata: FlowTransactionMetadata) {
private fun finaliseLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction, metadata: TransactionMetadata) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocallyAndBroadcast", flowLogic = this) {
finaliseLocally(tx, metadata = metadata)
progressTracker.currentStep = BROADCASTING
@ -310,7 +310,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
@Suspendable
private fun finaliseLocally(stx: SignedTransaction, notarySignatures: List<TransactionSignature> = emptyList(),
metadata: FlowTransactionMetadata? = null) {
metadata: TransactionMetadata? = null) {
progressTracker.currentStep = FINALISING_TRANSACTION
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocally", flowLogic = this) {
if (notarySignatures.isEmpty()) {
@ -405,7 +405,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
progressTracker.currentStep = RECORD_UNNOTARISED
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(tx,
FlowTransactionMetadata(
TransactionMetadata(
serviceHub.myInfo.legalIdentities.first().name,
statesToRecord,
sessions.map { it.counterparty.name }.toSet()))
@ -496,7 +496,7 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
logger.debug { "Peer recording transaction without notary signature." }
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
TransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
@ -523,7 +523,7 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession
} else {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransaction", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord,
FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
TransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
logger.info("Peer recorded transaction with recovery metadata.")
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)

View File

@ -10,20 +10,19 @@ import java.time.Instant
*/
@CordaSerializable
data class FlowTransaction(
data class FlowTransactionInfo(
val stateMachineRunId: StateMachineRunId,
val txId: String,
val status: TransactionStatus,
val signatures: ByteArray?,
val timestamp: Instant,
val metadata: FlowTransactionMetadata?) {
val metadata: TransactionMetadata?
) {
fun isInitiator(myCordaX500Name: CordaX500Name) =
this.metadata?.initiator == myCordaX500Name
this.metadata?.initiator == myCordaX500Name
}
@CordaSerializable
data class FlowTransactionMetadata(
data class TransactionMetadata(
val initiator: CordaX500Name,
val statesToRecord: StatesToRecord? = StatesToRecord.ONLY_RELEVANT,
val peers: Set<CordaX500Name>? = null
@ -34,4 +33,31 @@ enum class TransactionStatus {
UNVERIFIED,
VERIFIED,
IN_FLIGHT;
}
@CordaSerializable
data class RecoveryTimeWindow(val fromTime: Instant, val untilTime: Instant = Instant.now()) {
init {
if (untilTime < fromTime) {
throw IllegalArgumentException("$fromTime must be before $untilTime")
}
}
companion object {
@JvmStatic
fun between(fromTime: Instant, untilTime: Instant): RecoveryTimeWindow {
return RecoveryTimeWindow(fromTime, untilTime)
}
@JvmStatic
fun fromOnly(fromTime: Instant): RecoveryTimeWindow {
return RecoveryTimeWindow(fromTime = fromTime)
}
@JvmStatic
fun untilOnly(untilTime: Instant): RecoveryTimeWindow {
return RecoveryTimeWindow(fromTime = Instant.EPOCH, untilTime = untilTime)
}
}
}

View File

@ -4,7 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.DeleteForDJVM
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowTransactionMetadata
import net.corda.core.flows.TransactionMetadata
import net.corda.core.internal.notary.NotaryService
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
@ -37,7 +37,7 @@ interface ServiceHubCoreInternal : ServiceHub {
* @param txn The transaction to record.
* @param metadata Finality flow recovery metadata.
*/
fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata)
fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata)
/**
* Removes transaction from data store.
@ -63,7 +63,7 @@ interface ServiceHubCoreInternal : ServiceHub {
* @param statesToRecord how the vault should treat the output states of the transaction.
* @param metadata Finality flow recovery metadata.
*/
fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata)
fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata)
}
interface TransactionsResolver {

View File

@ -74,8 +74,7 @@ class FinalityFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
alice.rpc.startFlow(::GetFlowTransaction, txId).returnValue.getOrThrow().apply {
assertEquals("V", this.first) // "V" -> VERIFIED
assertEquals(ALICE_NAME.toString(), this.second) // initiator
assertEquals(CHARLIE_NAME.toString(), this.third) // peers
assertEquals(CHARLIE_NAME.hashCode().toLong(), this.second) // peer
}
}
}
@ -84,18 +83,25 @@ class FinalityFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
// Internal use for testing only!!
@StartableByRPC
class GetFlowTransaction(private val txId: SecureHash) : FlowLogic<Triple<String, String, String>>() {
class GetFlowTransaction(private val txId: SecureHash) : FlowLogic<Pair<String, Long>>() {
@Suspendable
override fun call(): Triple<String, String, String> {
return serviceHub.jdbcSession().prepareStatement("select * from node_transactions where tx_id = ?")
override fun call(): Pair<String, Long> {
val transactionStatus = serviceHub.jdbcSession().prepareStatement("select * from node_transactions where tx_id = ?")
.apply { setString(1, txId.toString()) }
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
Triple(rs.getString(4), // TransactionStatus
rs.getString(7), // initiator
rs.getString(8)) // participants
rs.getString(4) // TransactionStatus
}
}
val receiverPartyId = serviceHub.jdbcSession().prepareStatement("select * from node_sender_distribution_records where tx_id = ?")
.apply { setString(1, txId.toString()) }
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(2) // receiverPartyId
}
}
return Pair(transactionStatus, receiverPartyId)
}
}

View File

@ -20,6 +20,7 @@ import net.corda.testing.node.internal.cordappWithPackages
import org.assertj.core.api.Assertions.assertThat
import org.junit.BeforeClass
import org.junit.ClassRule
import org.junit.Ignore
import org.junit.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
@ -60,7 +61,8 @@ class DeterministicContractWithCustomSerializerTest {
}
@Test(timeout=300_000)
fun `test DJVM can verify using custom serializer`() {
@Ignore("Flaky test in CI: org.opentest4j.AssertionFailedError: Unexpected exception thrown: net.corda.client.rpc.RPCException: Class \"class net.corda.contracts.serialization.custom.Currantsy\" is not on the whitelist or annotated with @CordaSerializable.")
fun `test DJVM can verify using custom serializer`() {
driver(parametersFor(djvmSources, listOf(flowCordapp, contractCordapp))) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val txId = assertDoesNotThrow {

View File

@ -24,7 +24,7 @@ import org.junit.Rule
import org.junit.Test
class PersistentNetworkMapCacheTest {
private companion object {
internal companion object {
val ALICE = TestIdentity(ALICE_NAME, 70)
val BOB = TestIdentity(BOB_NAME, 80)
val CHARLIE = TestIdentity(CHARLIE_NAME, 90)

View File

@ -0,0 +1,95 @@
package net.corda.node.services.network
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.network.PersistentNetworkMapCacheTest.Companion.ALICE
import net.corda.node.services.network.PersistentNetworkMapCacheTest.Companion.BOB
import net.corda.node.services.network.PersistentNetworkMapCacheTest.Companion.CHARLIE
import net.corda.nodeapi.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.junit.Rule
import org.junit.Test
class PersistentPartyInfoCacheTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
private var portCounter = 1000
private val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
private val charlieNetMapCache = PersistentNetworkMapCache(TestingNamedCacheFactory(), database, InMemoryIdentityService(trustRoot = DEV_ROOT_CA.certificate))
@Test(timeout=300_000)
fun `get party id from CordaX500Name sourced from NetworkMapCache`() {
charlieNetMapCache.addOrUpdateNodes(listOf(
createNodeInfo(listOf(ALICE)),
createNodeInfo(listOf(BOB)),
createNodeInfo(listOf(CHARLIE))))
val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database)
partyInfoCache.start()
assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(ALICE.name.hashCode().toLong())
assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(BOB.name.hashCode().toLong())
assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(CHARLIE.name.hashCode().toLong())
}
@Test(timeout=300_000)
fun `get party id from CordaX500Name sourced from backing database`() {
charlieNetMapCache.addOrUpdateNodes(listOf(
createNodeInfo(listOf(ALICE)),
createNodeInfo(listOf(BOB)),
createNodeInfo(listOf(CHARLIE))))
PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database).start()
// clear network map cache & bootstrap another PersistentInfoCache
charlieNetMapCache.clearNetworkMapCache()
val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database)
assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(ALICE.name.hashCode().toLong())
assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(BOB.name.hashCode().toLong())
assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(CHARLIE.name.hashCode().toLong())
}
@Test(timeout=300_000)
fun `get party CordaX500Name from id sourced from NetworkMapCache`() {
charlieNetMapCache.addOrUpdateNodes(listOf(
createNodeInfo(listOf(ALICE)),
createNodeInfo(listOf(BOB)),
createNodeInfo(listOf(CHARLIE))))
val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database)
partyInfoCache.start()
assertThat(partyInfoCache.getCordaX500NameByPartyId(ALICE.name.hashCode().toLong())).isEqualTo(ALICE.name)
assertThat(partyInfoCache.getCordaX500NameByPartyId(BOB.name.hashCode().toLong())).isEqualTo(BOB.name)
assertThat(partyInfoCache.getCordaX500NameByPartyId(CHARLIE.name.hashCode().toLong())).isEqualTo(CHARLIE.name)
}
@Test(timeout=300_000)
fun `get party CordaX500Name from id sourced from backing database`() {
charlieNetMapCache.addOrUpdateNodes(listOf(
createNodeInfo(listOf(ALICE)),
createNodeInfo(listOf(BOB)),
createNodeInfo(listOf(CHARLIE))))
PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database).start()
// clear network map cache & bootstrap another PersistentInfoCache
charlieNetMapCache.clearNetworkMapCache()
val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database)
assertThat(partyInfoCache.getCordaX500NameByPartyId(ALICE.name.hashCode().toLong())).isEqualTo(ALICE.name)
assertThat(partyInfoCache.getCordaX500NameByPartyId(BOB.name.hashCode().toLong())).isEqualTo(BOB.name)
assertThat(partyInfoCache.getCordaX500NameByPartyId(CHARLIE.name.hashCode().toLong())).isEqualTo(CHARLIE.name)
}
private fun createNodeInfo(identities: List<TestIdentity>,
address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo {
return NodeInfo(
addresses = listOf(address),
legalIdentitiesAndCerts = identities.map { it.identity },
platformVersion = 3,
serial = 1
)
}
}

View File

@ -38,6 +38,7 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.div
import net.corda.core.internal.messaging.AttachmentTrustInfoRPCOps
import net.corda.core.internal.notary.NotaryService
@ -121,13 +122,14 @@ import net.corda.node.services.network.NetworkParameterUpdateListener
import net.corda.node.services.network.NetworkParametersHotloader
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.network.PersistentPartyInfoCache
import net.corda.node.services.persistence.AbstractPartyDescriptor
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
import net.corda.node.services.persistence.AttachmentStorageInternal
import net.corda.node.services.persistence.DBCheckpointPerformanceRecorder
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorageLedgerRecovery
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.NodePropertiesPersistentStore
import net.corda.node.services.persistence.PublicKeyToOwningIdentityCacheImpl
@ -285,6 +287,9 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
val networkMapCache = PersistentNetworkMapCache(cacheFactory, database, identityService).tokenize()
val partyInfoCache = PersistentPartyInfoCache(networkMapCache, cacheFactory, database)
@Suppress("LeakingThis")
val cryptoService = makeCryptoService()
@Suppress("LeakingThis")
val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize()
val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) }
@ -296,8 +301,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
).tokenize()
val attachmentTrustCalculator = makeAttachmentTrustCalculator(configuration, database)
@Suppress("LeakingThis")
val cryptoService = makeCryptoService()
@Suppress("LeakingThis")
val networkParametersStorage = makeNetworkParametersStorage()
val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(configuration.cordappDirectories), attachments).tokenize()
val diagnosticsService = NodeDiagnosticsService().tokenize()
@ -694,6 +697,10 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
log.warn("Not distributing events as NetworkMap is not ready")
}
}
nodeReadyFuture.thenMatch({
partyInfoCache.start()
}, { th -> log.error("Unexpected exception during cache initialisation", th) })
setNodeStatus(NodeStatus.STARTED)
return resultingNodeInfo
}
@ -1077,7 +1084,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
protected open fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage {
return DBTransactionStorage(database, cacheFactory, platformClock)
return DBTransactionStorageLedgerRecovery(database, cacheFactory, platformClock, cryptoService, partyInfoCache)
}
protected open fun makeNetworkParametersStorage(): NetworkParametersStorage {

View File

@ -5,7 +5,7 @@ import net.corda.core.context.InvocationContext
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowTransactionMetadata
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.TransactionStatus
import net.corda.core.internal.FlowStateMachineHandle
@ -240,25 +240,27 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
)
}
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata) {
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata) {
requireSupportedHashType(txn)
if (txn.coreTransaction is WireTransaction)
txn.verifyRequiredSignatures()
database.transaction {
recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) {
validatedTransactions.finalizeTransaction(txn, metadata)
val isInitiator = metadata.initiator == myInfo.legalIdentities.first().name
validatedTransactions.finalizeTransaction(txn, metadata, isInitiator)
}
}
}
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata) {
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata) {
if (txn.coreTransaction is WireTransaction) {
txn.notary?.let { notary ->
txn.verifySignaturesExcept(notary.owningKey)
} ?: txn.verifyRequiredSignatures()
}
database.transaction {
validatedTransactions.addUnnotarisedTransaction(txn, metadata)
val isInitiator = metadata.initiator == myInfo.legalIdentities.first().name
validatedTransactions.addUnnotarisedTransaction(txn, metadata, isInitiator)
}
}
@ -361,7 +363,7 @@ interface WritableTransactionStorage : TransactionStorage {
* @param metadata Finality flow recovery metadata.
* @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists.
*/
fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean
fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean
/**
* Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store.
@ -376,7 +378,7 @@ interface WritableTransactionStorage : TransactionStorage {
* @param metadata Finality flow recovery metadata.
* @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists.
*/
fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean
fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean
/**
* Update a previously un-notarised transaction including associated notary signatures.

View File

@ -0,0 +1,80 @@
package net.corda.node.services.network
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.node.services.NetworkMapCache
import net.corda.node.services.persistence.DBTransactionStorageLedgerRecovery
import net.corda.node.utilities.NonInvalidatingCache
import net.corda.nodeapi.internal.persistence.CordaPersistence
import org.hibernate.Session
import rx.Observable
class PersistentPartyInfoCache(private val networkMapCache: PersistentNetworkMapCache,
cacheFactory: NamedCacheFactory,
private val database: CordaPersistence) {
// probably better off using a BiMap here: https://www.baeldung.com/guava-bimap
private val cordaX500NameToPartyIdCache = NonInvalidatingCache<CordaX500Name, Long?>(
cacheFactory = cacheFactory,
name = "RecoveryPartyInfoCache_byCordaX500Name") { key ->
database.transaction { queryByCordaX500Name(session, key) }
}
private val partyIdToCordaX500NameCache = NonInvalidatingCache<Long, CordaX500Name?>(
cacheFactory = cacheFactory,
name = "RecoveryPartyInfoCache_byPartyId") { key ->
database.transaction { queryByPartyId(session, key) }
}
private lateinit var trackNetworkMapUpdates: Observable<NetworkMapCache.MapChange>
fun start() {
val (snapshot, updates) = networkMapCache.track()
snapshot.map { entry ->
entry.legalIdentities.map { party ->
add(party.name.hashCode().toLong(), party.name)
}
}
trackNetworkMapUpdates = updates
trackNetworkMapUpdates.cache().forEach { nodeInfo ->
nodeInfo.node.legalIdentities.map { party ->
add(party.name.hashCode().toLong(), party.name)
}
}
}
fun getPartyIdByCordaX500Name(name: CordaX500Name): Long = cordaX500NameToPartyIdCache[name] ?: throw IllegalStateException("Missing cache entry for $name")
fun getCordaX500NameByPartyId(partyId: Long): CordaX500Name = partyIdToCordaX500NameCache[partyId] ?: throw IllegalStateException("Missing cache entry for $partyId")
private fun add(partyHashCode: Long, partyName: CordaX500Name) {
partyIdToCordaX500NameCache.cache.put(partyHashCode, partyName)
cordaX500NameToPartyIdCache.cache.put(partyName, partyHashCode)
updateInfoDB(partyHashCode, partyName)
}
private fun updateInfoDB(partyHashCode: Long, partyName: CordaX500Name) {
database.transaction {
if (queryByPartyId(session, partyHashCode) == null) {
println("PartyInfo: $partyHashCode -> $partyName")
session.save(DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo(partyHashCode, partyName.toString()))
}
}
}
private fun queryByCordaX500Name(session: Session, key: CordaX500Name): Long? {
val query = session.createQuery(
"FROM ${DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java.name} WHERE partyName = :partyName",
DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java)
query.setParameter("partyName", key.toString())
return query.resultList.singleOrNull()?.partyId
}
private fun queryByPartyId(session: Session, key: Long): CordaX500Name? {
val query = session.createQuery(
"FROM ${DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java.name} WHERE partyId = :partyId",
DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java)
query.setParameter("partyId", key)
return query.resultList.singleOrNull()?.partyName?.let { CordaX500Name.parse(it) }
}
}

View File

@ -3,15 +3,13 @@ package net.corda.node.services.persistence
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowTransactionMetadata
import net.corda.core.identity.CordaX500Name
import net.corda.core.flows.TransactionMetadata
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.node.StatesToRecord
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
@ -52,8 +50,8 @@ import javax.persistence.Table
import kotlin.streams.toList
@Suppress("TooManyFunctions")
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory,
private val clock: CordaClock) : WritableTransactionStorage, SingletonSerializeAsToken() {
open class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory,
private val clock: CordaClock) : WritableTransactionStorage, SingletonSerializeAsToken() {
@Suppress("MagicNumber") // database column width
@Entity
@ -78,26 +76,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
val timestamp: Instant,
@Column(name = "signatures")
val signatures: ByteArray?,
/**
* Flow finality metadata used for recovery
* TODO: create association table solely for Flow metadata and recovery purposes.
* See https://r3-cev.atlassian.net/browse/ENT-9521
*/
/** X500Name of flow initiator **/
@Column(name = "initiator")
val initiator: String? = null,
/** X500Name of flow participant parties **/
@Column(name = "participants")
@Convert(converter = StringListConverter::class)
val participants: List<String>? = null,
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
@Column(name = "states_to_record")
val statesToRecord: StatesToRecord? = null
val signatures: ByteArray?
)
enum class TransactionStatus {
@ -150,21 +129,6 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
}
}
@Converter
class StringListConverter : AttributeConverter<List<String>?, String?> {
override fun convertToDatabaseColumn(stringList: List<String>?): String? {
return stringList?.let { if (it.isEmpty()) null else it.joinToString(SPLIT_CHAR) }
}
override fun convertToEntityAttribute(string: String?): List<String>? {
return string?.split(SPLIT_CHAR)
}
companion object {
private const val SPLIT_CHAR = ";"
}
}
internal companion object {
const val TRANSACTION_ALREADY_IN_PROGRESS_WARNING = "trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions."
@ -187,7 +151,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
private fun createTransactionsMap(cacheFactory: NamedCacheFactory, clock: CordaClock)
: AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> {
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
return WeightBasedAppendOnlyPersistentMap(
cacheFactory = cacheFactory,
name = "DBTransactionStorage_transactions",
toPersistentEntityKey = SecureHash::toString,
@ -195,14 +159,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
SecureHash.create(dbTxn.txId) to TxCacheValue(
dbTxn.transaction.deserialize(context = contextToUse()),
dbTxn.status,
dbTxn.signatures?.deserialize(context = contextToUse()),
dbTxn.initiator?.let { initiator ->
FlowTransactionMetadata(
CordaX500Name.parse(initiator),
dbTxn.statesToRecord!!,
dbTxn.participants?.let { it.map { CordaX500Name.parse(it) }.toSet() }
)
}
dbTxn.signatures?.deserialize(context = contextToUse())
)
},
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
@ -212,10 +169,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
transaction = value.toSignedTx().serialize(context = contextToUse().withEncoding(SNAPPY)).bytes,
status = value.status,
timestamp = clock.instant(),
signatures = value.sigs.serialize(context = contextToUse().withEncoding(SNAPPY)).bytes,
statesToRecord = value.metadata?.statesToRecord,
initiator = value.metadata?.initiator?.toString(),
participants = value.metadata?.peers?.map { it.toString() }
signatures = value.sigs.serialize(context = contextToUse().withEncoding(SNAPPY)).bytes
)
},
persistentEntityClass = DBTransaction::class.java,
@ -254,18 +208,18 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
updateTransaction(transaction.id)
}
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) =
addTransaction(transaction, metadata, TransactionStatus.IN_FLIGHT) {
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) =
addTransaction(transaction, TransactionStatus.IN_FLIGHT) {
false
}
override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) =
addTransaction(transaction, metadata) {
override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) =
addTransaction(transaction) {
false
}
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return database.transaction {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val delete = criteriaBuilder.createCriteriaDelete(DBTransaction::class.java)
val root = delete.from(DBTransaction::class.java)
@ -289,13 +243,12 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
finalizeTransactionWithExtraSignatures(transaction.id, signatures)
}
private fun addTransaction(transaction: SignedTransaction,
metadata: FlowTransactionMetadata? = null,
protected fun addTransaction(transaction: SignedTransaction,
status: TransactionStatus = TransactionStatus.VERIFIED,
updateFn: (SecureHash) -> Boolean): Boolean {
return database.transaction {
txStorage.locked {
val cachedValue = TxCacheValue(transaction, status, metadata)
val cachedValue = TxCacheValue(transaction, status)
val addedOrUpdated = addOrUpdate(transaction.id, cachedValue) { k, _ -> updateFn(k) }
if (addedOrUpdated) {
logger.debug { "Transaction ${transaction.id} has been recorded as $status" }
@ -436,29 +389,21 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
}
// Cache value type to just store the immutable bits of a signed transaction plus conversion helpers
private class TxCacheValue(
internal class TxCacheValue(
val txBits: SerializedBytes<CoreTransaction>,
val sigs: List<TransactionSignature>,
val status: TransactionStatus,
// flow metadata recorded for recovery
val metadata: FlowTransactionMetadata? = null
val status: TransactionStatus
) {
constructor(stx: SignedTransaction, status: TransactionStatus) : this(
stx.txBits,
Collections.unmodifiableList(stx.sigs),
status
)
constructor(stx: SignedTransaction, status: TransactionStatus, metadata: FlowTransactionMetadata?) : this(
stx.txBits,
Collections.unmodifiableList(stx.sigs),
status,
metadata
)
constructor(stx: SignedTransaction, status: TransactionStatus, sigs: List<TransactionSignature>?, metadata: FlowTransactionMetadata?) : this(
constructor(stx: SignedTransaction, status: TransactionStatus, sigs: List<TransactionSignature>?) : this(
stx.txBits,
if (sigs == null) Collections.unmodifiableList(stx.sigs) else Collections.unmodifiableList(stx.sigs + sigs).distinct(),
status,
metadata
status
)
fun toSignedTx() = SignedTransaction(txBits, sigs)
}

View File

@ -0,0 +1,330 @@
package net.corda.node.services.persistence
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.node.CordaClock
import net.corda.node.services.network.PersistentPartyInfoCache
import net.corda.nodeapi.internal.cryptoservice.CryptoService
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.serialization.internal.CordaSerializationEncoding
import org.hibernate.annotations.Immutable
import java.io.Serializable
import java.time.Instant
import java.util.concurrent.atomic.AtomicLong
import javax.persistence.Column
import javax.persistence.Embeddable
import javax.persistence.EmbeddedId
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
import javax.persistence.Table
import javax.persistence.criteria.Predicate
import kotlin.streams.toList
class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, cacheFactory: NamedCacheFactory,
val clock: CordaClock,
private val cryptoService: CryptoService,
private val partyInfoCache: PersistentPartyInfoCache) : DBTransactionStorage(database, cacheFactory, clock) {
@Embeddable
@Immutable
data class PersistentKey(
@Column(name = "sequence_number", nullable = false)
var sequenceNumber: Long,
@Column(name = "timestamp", nullable = false)
var timestamp: Instant
) : Serializable {
constructor(key: Key) : this(key.sequenceNumber, key.timestamp)
}
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}sender_distribution_records")
data class DBSenderDistributionRecord(
@EmbeddedId
var compositeKey: PersistentKey,
@Column(name = "tx_id", length = 144, nullable = false)
var txId: String,
/** PartyId of flow peer **/
@Column(name = "receiver_party_id", nullable = false)
val receiverPartyId: Long,
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
@Column(name = "states_to_record", nullable = false)
var statesToRecord: StatesToRecord
) {
fun toSenderDistributionRecord() =
SenderDistributionRecord(
SecureHash.parse(this.txId),
this.receiverPartyId,
this.statesToRecord,
this.compositeKey.timestamp
)
}
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}receiver_distribution_records")
data class DBReceiverDistributionRecord(
@EmbeddedId
var compositeKey: PersistentKey,
@Column(name = "tx_id", length = 144, nullable = false)
var txId: String,
/** PartyId of flow initiator **/
@Column(name = "sender_party_id", nullable = true)
val senderPartyId: Long,
/** Encrypted information for use by Sender (eg. partyId's of flow peers) **/
@Lob
@Column(name = "distribution_list", nullable = false)
val distributionList: ByteArray,
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
@Column(name = "receiver_states_to_record", nullable = false)
val receiverStatesToRecord: StatesToRecord,
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
@Column(name = "sender_states_to_record", nullable = false)
val senderStatesToRecord: StatesToRecord
) {
constructor(key: Key, txId: SecureHash, initiatorPartyId: Long, peerPartyIds: Set<Long>, statesToRecord: StatesToRecord, cryptoService: CryptoService) :
this(PersistentKey(key),
txId = txId.toString(),
senderPartyId = initiatorPartyId,
distributionList = cryptoService.encrypt(peerPartyIds.serialize(context = contextToUse().withEncoding(CordaSerializationEncoding.SNAPPY)).bytes),
receiverStatesToRecord = statesToRecord,
senderStatesToRecord = StatesToRecord.NONE // to be set in follow-up PR.
)
fun toReceiverDistributionRecord(cryptoService: CryptoService) =
ReceiverDistributionRecord(
SecureHash.parse(this.txId),
this.senderPartyId,
cryptoService.decrypt(this.distributionList).deserialize(context = contextToUse()),
this.receiverStatesToRecord,
this.compositeKey.timestamp
)
}
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}recovery_party_info")
data class DBRecoveryPartyInfo(
@Id
/** CordaX500Name hashCode() **/
@Column(name = "party_id", nullable = false)
var partyId: Long,
/** CordaX500Name of party **/
@Column(name = "party_name", nullable = false)
val partyName: String
)
class Key(
val timestamp: Instant,
val sequenceNumber: Long = nextSequenceNumber.andIncrement
) {
companion object {
private val nextSequenceNumber = AtomicLong()
}
}
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean {
return addTransaction(transaction, TransactionStatus.IN_FLIGHT) {
addTransactionRecoveryMetadata(transaction.id, metadata, isInitiator, clock)
}
}
override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) =
addTransaction(transaction) {
addTransactionRecoveryMetadata(transaction.id, metadata, isInitiator, clock)
}
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return database.transaction {
super.removeUnnotarisedTransaction(id)
val criteriaBuilder = session.criteriaBuilder
val deleteSenderDistributionRecords = criteriaBuilder.createCriteriaDelete(DBSenderDistributionRecord::class.java)
val root = deleteSenderDistributionRecords.from(DBSenderDistributionRecord::class.java)
deleteSenderDistributionRecords.where(criteriaBuilder.equal(root.get<String>(DBSenderDistributionRecord::txId.name), id.toString()))
val deletedSenderDistributionRecords = session.createQuery(deleteSenderDistributionRecords).executeUpdate() != 0
val deleteReceiverDistributionRecords = criteriaBuilder.createCriteriaDelete(DBReceiverDistributionRecord::class.java)
val rootReceiverDistributionRecord = deleteReceiverDistributionRecords.from(DBReceiverDistributionRecord::class.java)
deleteReceiverDistributionRecords.where(criteriaBuilder.equal(rootReceiverDistributionRecord.get<String>(DBReceiverDistributionRecord::txId.name), id.toString()))
val deletedReceiverDistributionRecords = session.createQuery(deleteReceiverDistributionRecords).executeUpdate() != 0
deletedSenderDistributionRecords || deletedReceiverDistributionRecords
}
}
fun queryDistributionRecords(timeWindow: RecoveryTimeWindow,
recordType: DistributionRecordType = DistributionRecordType.ALL,
excludingTxnIds: Set<SecureHash>? = null,
orderByTimestamp: Sort.Direction? = null
): List<DistributionRecord> {
return when(recordType) {
DistributionRecordType.SENDER ->
querySenderDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp)
DistributionRecordType.RECEIVER ->
queryReceiverDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp)
DistributionRecordType.ALL ->
querySenderDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp).plus(
queryReceiverDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp)
)
}
}
@Suppress("SpreadOperator")
fun querySenderDistributionRecords(timeWindow: RecoveryTimeWindow,
peers: Set<CordaX500Name> = emptySet(),
excludingTxnIds: Set<SecureHash>? = null,
orderByTimestamp: Sort.Direction? = null
): List<SenderDistributionRecord> {
return database.transaction {
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBSenderDistributionRecord::class.java)
val txnMetadata = criteriaQuery.from(DBSenderDistributionRecord::class.java)
val predicates = mutableListOf<Predicate>()
val compositeKey = txnMetadata.get<PersistentKey>("compositeKey")
predicates.add(criteriaBuilder.greaterThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.fromTime))
predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.untilTime)))
excludingTxnIds?.let { excludingTxnIds ->
predicates.add(criteriaBuilder.and(criteriaBuilder.notEqual(txnMetadata.get<String>(DBSenderDistributionRecord::txId.name),
excludingTxnIds.map { it.toString() })))
}
if (peers.isNotEmpty()) {
val peerPartyIds = peers.map { partyInfoCache.getPartyIdByCordaX500Name(it) }
predicates.add(criteriaBuilder.and(txnMetadata.get<Long>(DBSenderDistributionRecord::receiverPartyId.name).`in`(peerPartyIds)))
}
criteriaQuery.where(*predicates.toTypedArray())
// optionally order by timestamp
orderByTimestamp?.let {
val orderCriteria =
when (orderByTimestamp) {
// when adding column position of 'group by' shift in case columns were removed
Sort.Direction.ASC -> criteriaBuilder.asc(compositeKey.get<Instant>(PersistentKey::timestamp.name))
Sort.Direction.DESC -> criteriaBuilder.desc(compositeKey.get<Instant>(PersistentKey::timestamp.name))
}
criteriaQuery.orderBy(orderCriteria)
}
val results = session.createQuery(criteriaQuery).stream()
results.map { it.toSenderDistributionRecord() }.toList()
}
}
@Suppress("SpreadOperator")
fun queryReceiverDistributionRecords(timeWindow: RecoveryTimeWindow,
initiators: Set<CordaX500Name> = emptySet(),
excludingTxnIds: Set<SecureHash>? = null,
orderByTimestamp: Sort.Direction? = null
): List<ReceiverDistributionRecord> {
return database.transaction {
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBReceiverDistributionRecord::class.java)
val txnMetadata = criteriaQuery.from(DBReceiverDistributionRecord::class.java)
val predicates = mutableListOf<Predicate>()
val compositeKey = txnMetadata.get<PersistentKey>("compositeKey")
predicates.add(criteriaBuilder.greaterThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.fromTime))
predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.untilTime)))
excludingTxnIds?.let { excludingTxnIds ->
predicates.add(criteriaBuilder.and(criteriaBuilder.notEqual(txnMetadata.get<String>(DBReceiverDistributionRecord::txId.name),
excludingTxnIds.map { it.toString() })))
}
if (initiators.isNotEmpty()) {
val initiatorPartyIds = initiators.map { partyInfoCache.getPartyIdByCordaX500Name(it) }
predicates.add(criteriaBuilder.and(txnMetadata.get<Long>(DBReceiverDistributionRecord::senderPartyId.name).`in`(initiatorPartyIds)))
}
criteriaQuery.where(*predicates.toTypedArray())
// optionally order by timestamp
orderByTimestamp?.let {
val orderCriteria =
when (orderByTimestamp) {
// when adding column position of 'group by' shift in case columns were removed
Sort.Direction.ASC -> criteriaBuilder.asc(compositeKey.get<Instant>(PersistentKey::timestamp.name))
Sort.Direction.DESC -> criteriaBuilder.desc(compositeKey.get<Instant>(PersistentKey::timestamp.name))
}
criteriaQuery.orderBy(orderCriteria)
}
val results = session.createQuery(criteriaQuery).stream()
results.map { it.toReceiverDistributionRecord(cryptoService) }.toList()
}
}
@Suppress("IMPLICIT_CAST_TO_ANY")
private fun addTransactionRecoveryMetadata(txId: SecureHash, metadata: TransactionMetadata, isInitiator: Boolean, clock: CordaClock): Boolean {
database.transaction {
if (isInitiator) {
metadata.peers?.map { peer ->
val senderDistributionRecord = DBSenderDistributionRecord(PersistentKey(Key(clock.instant())),
txId.toString(),
partyInfoCache.getPartyIdByCordaX500Name(peer),
metadata.statesToRecord ?: StatesToRecord.ONLY_RELEVANT)
session.save(senderDistributionRecord)
}
} else {
val receiverDistributionRecord =
DBReceiverDistributionRecord(Key(clock.instant()),
txId,
partyInfoCache.getPartyIdByCordaX500Name(metadata.initiator),
metadata.peers?.map { partyInfoCache.getPartyIdByCordaX500Name(it) }?.toSet() ?: emptySet(),
metadata.statesToRecord ?: StatesToRecord.ONLY_RELEVANT,
cryptoService)
session.save(receiverDistributionRecord)
}
}
return false
}
}
// TO DO: https://r3-cev.atlassian.net/browse/ENT-9876
private fun CryptoService.decrypt(bytes: ByteArray): ByteArray {
return bytes
}
// TO DO: https://r3-cev.atlassian.net/browse/ENT-9876
private fun CryptoService.encrypt(bytes: ByteArray): ByteArray {
return bytes
}
@CordaSerializable
open class DistributionRecord(
open val txId: SecureHash,
open val statesToRecord: StatesToRecord,
open val timestamp: Instant
)
@CordaSerializable
data class SenderDistributionRecord(
override val txId: SecureHash,
val peerPartyId: Long, // CordaX500Name hashCode()
override val statesToRecord: StatesToRecord,
override val timestamp: Instant
) : DistributionRecord(txId, statesToRecord, timestamp)
@CordaSerializable
data class ReceiverDistributionRecord(
override val txId: SecureHash,
val initiatorPartyId: Long, // CordaX500Name hashCode()
val peerPartyIds: Set<Long>, // CordaX500Name hashCode()
override val statesToRecord: StatesToRecord,
override val timestamp: Instant
) : DistributionRecord(txId, statesToRecord, timestamp)
@CordaSerializable
enum class DistributionRecordType {
SENDER, RECEIVER, ALL
}

View File

@ -16,6 +16,7 @@ import net.corda.node.services.keys.BasicHSMKeyManagementService
import net.corda.node.services.messaging.P2PMessageDeduplicator
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionStorageLedgerRecovery
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.PublicKeyHashToExternalId
@ -51,7 +52,10 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
ContractUpgradeServiceImpl.DBContractUpgrade::class.java,
DBNetworkParametersStorage.PersistentNetworkParameters::class.java,
PublicKeyHashToExternalId::class.java,
PersistentNetworkMapCache.PersistentPartyToPublicKeyHash::class.java
PersistentNetworkMapCache.PersistentPartyToPublicKeyHash::class.java,
DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java,
DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java,
DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java
)) {
override val migrationResource = "node-core.changelog-master"
}

View File

@ -64,6 +64,10 @@ open class DefaultNamedCacheFactory protected constructor(private val metricRegi
name == "PublicKeyToOwningIdentityCache_cache" -> caffeine.maximumSize(defaultCacheSize)
name == "NodeAttachmentTrustCalculator_trustedKeysCache" -> caffeine.maximumSize(defaultCacheSize)
name == "AttachmentsClassLoader_cache" -> caffeine.maximumSize(defaultAttachmentsClassLoaderCacheSize)
name == "RecoveryPartyInfoCache_byCordaX500Name" -> caffeine.maximumSize(defaultCacheSize)
name == "RecoveryPartyInfoCache_byPartyId" -> caffeine.maximumSize(defaultCacheSize)
name == "DBTransactionRecovery_senderDistributionRecords" -> caffeine.maximumSize(defaultCacheSize)
name == "DBTransactionRecovery_receiverDistributionRecords" -> caffeine.maximumSize(defaultCacheSize)
else -> throw IllegalArgumentException("Unexpected cache name $name. Did you add a new cache?")
}
}

View File

@ -30,6 +30,7 @@
<include file="migration/node-core.changelog-v22.xml"/>
<include file="migration/node-core.changelog-v23.xml"/>
<include file="migration/node-core.changelog-v24.xml"/>
<include file="migration/node-core.changelog-v25.xml"/>
<!-- This must run after node-core.changelog-init.xml, to prevent database columns being created twice. -->
<include file="migration/vault-schema.changelog-v9.xml"/>

View File

@ -0,0 +1,112 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="remove_flow_metadata_columns">
<dropColumn tableName="node_transactions" columnName="initiator"/>
<dropColumn tableName="node_transactions" columnName="participants"/>
<dropColumn tableName="node_transactions" columnName="states_to_record"/>
</changeSet>
<changeSet author="R3.Corda" id="create_sender_distribution_records_table">
<createTable tableName="node_sender_distribution_records">
<column name="sequence_number" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="tx_id" type="NVARCHAR(144)">
<constraints nullable="false"/>
</column>
<column name="receiver_party_id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="states_to_record" type="INT">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="node_sender_distribution_records_pkey">
<addPrimaryKey columnNames="timestamp, sequence_number" constraintName="node_sender_distribution_records_pkey"
tableName="node_sender_distribution_records"/>
</changeSet>
<changeSet author="R3.Corda" id="node_sender_distribution_records_idx">
<createIndex indexName="node_sender_distribution_records_idx" tableName="node_sender_distribution_records">
<column name="timestamp"/>
<column name="sequence_number"/>
<column name="receiver_party_id"/>
</createIndex>
</changeSet>
<changeSet author="R3.Corda" id="create_receiver_distribution_records_table">
<createTable tableName="node_receiver_distribution_records">
<column name="sequence_number" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="tx_id" type="NVARCHAR(144)">
<constraints nullable="false"/>
</column>
<column name="sender_party_id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="distribution_list" type="BLOB">
<constraints nullable="false"/>
</column>
<column name="sender_states_to_record" type="INT">
<constraints nullable="false"/>
</column>
<column name="receiver_states_to_record" type="INT">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="node_receiver_distribution_records_pkey">
<addPrimaryKey columnNames="timestamp, sequence_number" constraintName="node_receiver_distribution_records_pkey"
tableName="node_receiver_distribution_records"/>
</changeSet>
<changeSet author="R3.Corda" id="node_receiver_distribution_records_idx">
<createIndex indexName="node_receiver_distribution_records_idx" tableName="node_receiver_distribution_records">
<column name="timestamp"/>
<column name="sequence_number"/>
<column name="sender_party_id"/>
</createIndex>
</changeSet>
<changeSet author="R3.Corda" id="create_recovery_party_info_table">
<createTable tableName="node_recovery_party_info">
<column name="party_id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="party_name" type="NVARCHAR(255)">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="node_recovery_party_info_pkey">
<addPrimaryKey columnNames="party_id" constraintName="node_recovery_party_info_pkey" tableName="node_recovery_party_info"/>
</changeSet>
<changeSet author="R3.Corda" id="FK__sender_distribution_records__receiver_party_id">
<addForeignKeyConstraint baseColumnNames="receiver_party_id" baseTableName="node_sender_distribution_records"
constraintName="FK__sender_distribution_records__receiver_party_id"
referencedColumnNames="party_id" referencedTableName="node_recovery_party_info"/>
</changeSet>
<changeSet author="R3.Corda" id="FK__receiver_distribution_records__initiator_party_id">
<addForeignKeyConstraint baseColumnNames="sender_party_id" baseTableName="node_receiver_distribution_records"
constraintName="FK__receiver_distribution_records__initiator_party_id"
referencedColumnNames="party_id" referencedTableName="node_recovery_party_info"/>
</changeSet>
</databaseChangeLog>

View File

@ -17,7 +17,7 @@ import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowTransactionMetadata
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StateMachineRunId
@ -801,10 +801,10 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
return true
}
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean {
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean {
database.transaction {
records.add(TxRecord.Add(transaction))
delegate.addUnnotarisedTransaction(transaction, metadata)
delegate.addUnnotarisedTransaction(transaction, metadata, isInitiator)
}
return true
}
@ -815,9 +815,9 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
}
}
override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean {
override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean {
database.transaction {
delegate.finalizeTransaction(transaction, metadata)
delegate.finalizeTransaction(transaction, metadata, isInitiator)
}
return true
}

View File

@ -0,0 +1,305 @@
package net.corda.node.services.persistence
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.sign
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.node.NodeInfo
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.CordaClock
import net.corda.node.SimpleClock
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.network.PersistentPartyInfoCache
import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.IN_FLIGHT
import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.VERIFIED
import net.corda.nodeapi.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.cryptoservice.CryptoService
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.dummyCommand
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.internal.createWireTransaction
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.internal.MockCryptoService
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.security.KeyPair
import java.time.Clock
import java.time.Instant.now
import java.time.temporal.ChronoUnit
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
class DBTransactionStorageLedgerRecoveryTests {
private companion object {
val ALICE = TestIdentity(ALICE_NAME, 70)
val BOB = TestIdentity(BOB_NAME, 80)
val CHARLIE = TestIdentity(CHARLIE_NAME, 90)
val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20)
}
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(inheritable = true)
private lateinit var database: CordaPersistence
private lateinit var transactionRecovery: DBTransactionStorageLedgerRecovery
private lateinit var partyInfoCache: PersistentPartyInfoCache
@Before
fun setUp() {
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps, DatabaseConfig(), { null }, { null })
newTransactionRecovery()
}
@After
fun cleanUp() {
database.close()
}
@Test(timeout = 300_000)
fun `query local ledger for transactions with recovery peers within time window`() {
val beforeFirstTxn = now()
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
val timeWindow = RecoveryTimeWindow(fromTime = beforeFirstTxn,
untilTime = beforeFirstTxn.plus(1, ChronoUnit.MINUTES))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow)
assertEquals(1, results.size)
val afterFirstTxn = now()
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(CHARLIE_NAME)), true)
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow).size)
assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn)).size)
}
@Test(timeout = 300_000)
fun `query local ledger for transactions within timeWindow and excluding remoteTransactionIds`() {
val transaction1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction1, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
val transaction2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction2, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id))
assertEquals(1, results.size)
}
@Test(timeout = 300_000)
fun `query local ledger by distribution record type`() {
val transaction1 = newTransaction()
// sender txn
transactionRecovery.addUnnotarisedTransaction(transaction1, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
val transaction2 = newTransaction()
// receiver txn
transactionRecovery.addUnnotarisedTransaction(transaction2, TransactionMetadata(BOB_NAME, StatesToRecord.ALL_VISIBLE, setOf(ALICE_NAME)), false)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let {
assertEquals(1, it.size)
assertEquals((it[0] as SenderDistributionRecord).peerPartyId, BOB_NAME.hashCode().toLong())
}
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let {
assertEquals(1, it.size)
assertEquals((it[0] as ReceiverDistributionRecord).initiatorPartyId, BOB_NAME.hashCode().toLong())
}
val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL)
assertEquals(2, resultsAll.size)
}
@Test(timeout = 300_000)
fun `query for sender distribution records by peers`() {
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(CHARLIE_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME, CHARLIE_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(BOB_NAME, StatesToRecord.ONLY_RELEVANT, setOf(ALICE_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(CHARLIE_NAME, StatesToRecord.ONLY_RELEVANT), true)
assertEquals(5, readSenderDistributionRecordFromDB().size)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(BOB_NAME)).let {
assertEquals(2, it.size)
assertEquals(it[0].statesToRecord, StatesToRecord.ALL_VISIBLE)
assertEquals(it[1].statesToRecord, StatesToRecord.ONLY_RELEVANT)
}
assertEquals(1, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(ALICE_NAME)).size)
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)).size)
}
@Test(timeout = 300_000)
fun `query for receiver distribution records by initiator`() {
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME, CHARLIE_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.NONE, setOf(CHARLIE_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(BOB_NAME, StatesToRecord.ALL_VISIBLE, setOf(ALICE_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(CHARLIE_NAME, StatesToRecord.ONLY_RELEVANT), false)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let {
assertEquals(3, it.size)
assertEquals(it[0].statesToRecord, StatesToRecord.ALL_VISIBLE)
assertEquals(it[1].statesToRecord, StatesToRecord.ONLY_RELEVANT)
assertEquals(it[2].statesToRecord, StatesToRecord.NONE)
}
assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME)).size)
assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(CHARLIE_NAME)).size)
assertEquals(2, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME, CHARLIE_NAME)).size)
}
@Test(timeout = 300_000)
fun `create un-notarised transaction with flow metadata and validate status in db`() {
val senderTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(senderTransaction, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
readSenderDistributionRecordFromDB(senderTransaction.id).let {
assertEquals(1, it.size)
assertEquals(StatesToRecord.ALL_VISIBLE, it[0].statesToRecord)
assertEquals(BOB_NAME, partyInfoCache.getCordaX500NameByPartyId(it[0].peerPartyId))
}
val receiverTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(receiverTransaction, TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME)), false)
assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status)
readReceiverDistributionRecordFromDB(receiverTransaction.id).let {
assertEquals(StatesToRecord.ONLY_RELEVANT, it.statesToRecord)
assertEquals(ALICE_NAME, partyInfoCache.getCordaX500NameByPartyId(it.initiatorPartyId))
assertEquals(setOf(BOB_NAME), it.peerPartyIds.map { partyInfoCache.getCordaX500NameByPartyId(it) }.toSet() )
}
}
@Test(timeout = 300_000)
fun `finalize transaction with recovery metadata`() {
val transaction = newTransaction(notarySig = false)
transactionRecovery.finalizeTransaction(transaction,
TransactionMetadata(ALICE_NAME), false)
assertEquals(VERIFIED, readTransactionFromDB(transaction.id).status)
assertEquals(StatesToRecord.ONLY_RELEVANT, readReceiverDistributionRecordFromDB(transaction.id).statesToRecord)
}
@Test(timeout = 300_000)
fun `remove un-notarised transaction and associated recovery metadata`() {
val senderTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(senderTransaction, TransactionMetadata(ALICE.name, peers = setOf(BOB.name, CHARLIE_NAME)), true)
assertNull(transactionRecovery.getTransaction(senderTransaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(senderTransaction.id))
assertFailsWith<AssertionError> { readTransactionFromDB(senderTransaction.id).status }
assertEquals(0, readSenderDistributionRecordFromDB(senderTransaction.id).size)
assertNull(transactionRecovery.getTransactionInternal(senderTransaction.id))
val receiverTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(receiverTransaction, TransactionMetadata(ALICE.name), false)
assertNull(transactionRecovery.getTransaction(receiverTransaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status)
assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(receiverTransaction.id))
assertFailsWith<AssertionError> { readTransactionFromDB(receiverTransaction.id).status }
assertFailsWith<AssertionError> { readReceiverDistributionRecordFromDB(receiverTransaction.id) }
assertNull(transactionRecovery.getTransactionInternal(receiverTransaction.id))
}
private fun readTransactionFromDB(id: SecureHash): DBTransactionStorage.DBTransaction {
val fromDb = database.transaction {
session.createQuery(
"from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId",
DBTransactionStorage.DBTransaction::class.java
).setParameter("transactionId", id.toString()).resultList.map { it }
}
assertEquals(1, fromDb.size)
return fromDb[0]
}
private fun readSenderDistributionRecordFromDB(id: SecureHash? = null): List<SenderDistributionRecord> {
return database.transaction {
if (id != null)
session.createQuery(
"from ${DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java.name} where tx_id = :transactionId",
DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java
).setParameter("transactionId", id.toString()).resultList.map { it.toSenderDistributionRecord() }
else
session.createQuery(
"from ${DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java.name}",
DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java
).resultList.map { it.toSenderDistributionRecord() }
}
}
private fun readReceiverDistributionRecordFromDB(id: SecureHash): ReceiverDistributionRecord {
val fromDb = database.transaction {
session.createQuery(
"from ${DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java.name} where tx_id = :transactionId",
DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java
).setParameter("transactionId", id.toString()).resultList.map { it }
}
assertEquals(1, fromDb.size)
return fromDb[0].toReceiverDistributionRecord(MockCryptoService(emptyMap()))
}
private fun newTransactionRecovery(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC()),
cryptoService: CryptoService = MockCryptoService(emptyMap())) {
val networkMapCache = PersistentNetworkMapCache(TestingNamedCacheFactory(), database, InMemoryIdentityService(trustRoot = DEV_ROOT_CA.certificate))
val alice = createNodeInfo(listOf(ALICE))
val bob = createNodeInfo(listOf(BOB))
val charlie = createNodeInfo(listOf(CHARLIE))
networkMapCache.addOrUpdateNodes(listOf(alice, bob, charlie))
partyInfoCache = PersistentPartyInfoCache(networkMapCache, TestingNamedCacheFactory(), database)
partyInfoCache.start()
transactionRecovery = DBTransactionStorageLedgerRecovery(database, TestingNamedCacheFactory(cacheSizeBytesOverride
?: 1024), clock, cryptoService, partyInfoCache)
}
private var portCounter = 1000
private fun createNodeInfo(identities: List<TestIdentity>,
address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo {
return NodeInfo(
addresses = listOf(address),
legalIdentitiesAndCerts = identities.map { it.identity },
platformVersion = 3,
serial = 1
)
}
private fun newTransaction(notarySig: Boolean = true): SignedTransaction {
val wtx = createWireTransaction(
inputs = listOf(StateRef(SecureHash.randomSHA256(), 0)),
attachments = emptyList(),
outputs = emptyList(),
commands = listOf(dummyCommand(ALICE.publicKey)),
notary = DUMMY_NOTARY.party,
timeWindow = null
)
return makeSigned(wtx, ALICE.keyPair, notarySig = notarySig)
}
private fun makeSigned(wtx: WireTransaction, vararg keys: KeyPair, notarySig: Boolean = true): SignedTransaction {
val keySigs = keys.map { it.sign(SignableData(wtx.id, SignatureMetadata(1, Crypto.findSignatureScheme(it.public).schemeNumberID))) }
val sigs = if (notarySig) {
keySigs + notarySig(wtx.id)
} else {
keySigs
}
return SignedTransaction(wtx, sigs)
}
private fun notarySig(txId: SecureHash) =
DUMMY_NOTARY.keyPair.sign(SignableData(txId, SignatureMetadata(1, Crypto.findSignatureScheme(DUMMY_NOTARY.publicKey).schemeNumberID)))
}

View File

@ -10,8 +10,7 @@ import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.sign
import net.corda.core.flows.FlowTransactionMetadata
import net.corda.core.node.StatesToRecord
import net.corda.core.flows.TransactionMetadata
import net.corda.core.serialization.deserialize
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
@ -26,7 +25,6 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
@ -43,7 +41,6 @@ import org.junit.Before
import org.junit.Rule
import org.junit.Test
import rx.plugins.RxJavaHooks
import java.lang.AssertionError
import java.security.KeyPair
import java.time.Clock
import java.time.Instant
@ -58,7 +55,6 @@ import kotlin.test.assertNull
class DBTransactionStorageTests {
private companion object {
val ALICE = TestIdentity(ALICE_NAME, 70)
val BOB_PARTY = TestIdentity(BOB_NAME, 80).party
val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20)
}
@ -113,24 +109,10 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction()
transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name))
transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true)
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
}
@Test(timeout = 300_000)
fun `create un-notarised transaction with flow metadata and validate status in db`() {
val now = Instant.ofEpochSecond(333444555L)
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction()
transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name, StatesToRecord.ALL_VISIBLE, setOf(BOB_PARTY.name)))
val txn = readTransactionFromDB(transaction.id)
assertEquals(IN_FLIGHT, txn.status)
assertEquals(StatesToRecord.ALL_VISIBLE, txn.statesToRecord)
assertEquals(ALICE_NAME.toString(), txn.initiator)
assertEquals(listOf(BOB_NAME.toString()), txn.participants)
}
@Test(timeout = 300_000)
fun `finalize transaction with no prior recording of un-notarised transaction`() {
val now = Instant.ofEpochSecond(333444555L)
@ -150,7 +132,7 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name))
transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true)
assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
transactionStorage.finalizeTransactionWithExtraSignatures(transaction, emptyList())
@ -160,28 +142,13 @@ class DBTransactionStorageTests {
}
}
@Test(timeout = 300_000)
fun `finalize transaction with recovery metadata`() {
val now = Instant.ofEpochSecond(333444555L)
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.finalizeTransaction(transaction,
FlowTransactionMetadata(ALICE_NAME))
readTransactionFromDB(transaction.id).let {
assertEquals(VERIFIED, it.status)
assertEquals(ALICE_NAME.toString(), it.initiator)
assertEquals(StatesToRecord.ONLY_RELEVANT, it.statesToRecord)
}
}
@Test(timeout = 300_000)
fun `finalize transaction with extra signatures after recording transaction as un-notarised`() {
val now = Instant.ofEpochSecond(333444555L)
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name))
transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true)
assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
val notarySig = notarySig(transaction.id)
@ -198,7 +165,7 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name))
transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true)
assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
@ -232,7 +199,7 @@ class DBTransactionStorageTests {
val transactionWithoutNotarySig = newTransaction(notarySig = false)
// txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig, FlowTransactionMetadata(ALICE.party.name))
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig, TransactionMetadata(ALICE.party.name), false)
assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySig.id).status)
// txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)
@ -263,7 +230,7 @@ class DBTransactionStorageTests {
val transactionWithoutNotarySigs = newTransaction(notarySig = false)
// txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs, FlowTransactionMetadata(ALICE.party.name))
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs, TransactionMetadata(ALICE.party.name), false)
assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySigs.id).status)
// txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)

View File

@ -9,7 +9,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.flows.FlowTransactionMetadata
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.TransactionStatus
import net.corda.testing.node.MockServices
import rx.Observable
@ -55,7 +55,7 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
}
}
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean {
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean {
return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.IN_FLIGHT)) == null
}
@ -63,7 +63,7 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
return txns.remove(id) != null
}
override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) =
override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) =
addTransaction(transaction)
override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>): Boolean {

View File

@ -8,7 +8,7 @@ import net.corda.core.crypto.NullKeys.NULL_SIGNATURE
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowTransactionMetadata
import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.notary.NotaryService
@ -139,13 +139,13 @@ data class TestTransactionDSLInterpreter private constructor(
override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(TestingNamedCacheFactory())
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata) {}
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata) {}
override fun removeUnnotarisedTransaction(id: SecureHash) {}
override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {}
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata) {}
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata) {}
}
private fun copy(): TestTransactionDSLInterpreter =