From 962fdba0f8abce947fc37936c793d30b64963490 Mon Sep 17 00:00:00 2001 From: "rick.parker" Date: Fri, 14 Oct 2016 14:49:46 +0100 Subject: [PATCH] Make transaction -> statemachine id mapping persistent. --- .../core/protocols/ProtocolStateMachine.kt | 1 + .../com/r3corda/node/internal/AbstractNode.kt | 2 +- .../DBTransactionMappingStorage.kt | 65 +++++++++++++++++++ .../messaging/TwoPartyTradeProtocolTests.kt | 6 +- 4 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 node/src/main/kotlin/com/r3corda/node/services/persistence/DBTransactionMappingStorage.kt diff --git a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt index fdf30e9be5..c2d94a4599 100644 --- a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt +++ b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt @@ -12,6 +12,7 @@ data class StateMachineRunId private constructor(val uuid: UUID) { companion object { fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID()) + fun wrap(uuid: UUID): StateMachineRunId = StateMachineRunId(uuid) } override fun toString(): String = "[$uuid]" diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 37c0a52909..9805127383 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -451,7 +451,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo // Ensure all required keys exist. obtainKeyPair(configuration.basedir, service.type.id + "-private-key", service.type.id + "-public", service.type.id) } - val stateMachineTransactionMappingStorage = InMemoryStateMachineRecordedTransactionMappingStorage() + val stateMachineTransactionMappingStorage = DBTransactionMappingStorage() return Pair( constructStorageService(attachments, transactionStorage, stateMachineTransactionMappingStorage), checkpointStorage diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DBTransactionMappingStorage.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DBTransactionMappingStorage.kt new file mode 100644 index 0000000000..e49266fb01 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DBTransactionMappingStorage.kt @@ -0,0 +1,65 @@ +package com.r3corda.node.services.persistence + +import com.r3corda.core.ThreadBox +import com.r3corda.core.bufferUntilSubscribed +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.services.StateMachineRecordedTransactionMappingStorage +import com.r3corda.core.node.services.StateMachineTransactionMapping +import com.r3corda.core.protocols.StateMachineRunId +import com.r3corda.node.utilities.* +import org.jetbrains.exposed.sql.ResultRow +import org.jetbrains.exposed.sql.statements.InsertStatement +import rx.Observable +import rx.subjects.PublishSubject +import javax.annotation.concurrent.ThreadSafe + +/** + * Database storage of a txhash -> state machine id mapping. + * + * Mappings are added as transactions are persisted by [ServiceHub.recordTransaction], and never deleted. Used in the + * RPC API to correlate transaction creation with protocols. + * + */ +@ThreadSafe +class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorage { + + private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}transaction_mappings") { + val txId = secureHash("tx_id") + val stateMachineRunId = uuidString("state_machine_run_id") + } + + private class TransactionMappingsMap : AbstractJDBCHashMap(Table, loadOnInit = false) { + override fun keyFromRow(row: ResultRow): SecureHash = row[table.txId] + + override fun valueFromRow(row: ResultRow): StateMachineRunId = StateMachineRunId.wrap(row[table.stateMachineRunId]) + + override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.txId] = entry.key + } + + override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.stateMachineRunId] = entry.value.uuid + } + } + + private val mutex = ThreadBox(object { + val stateMachineTransactionMap = TransactionMappingsMap() + val updates = PublishSubject.create() + }) + + override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) { + mutex.locked { + stateMachineTransactionMap[transactionId] = stateMachineRunId + updates.onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId)) + } + } + + override fun track(): Pair, Observable> { + mutex.locked { + return Pair( + stateMachineTransactionMap.map { StateMachineTransactionMapping(it.value, it.key) }, + updates.bufferUntilSubscribed() + ) + } + } +} diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt index c035bfa5e2..ec90d92797 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -350,7 +350,11 @@ class TwoPartyTradeProtocolTests { net.runNetwork() // Clear network map registration messages val aliceTxStream = aliceNode.storage.validatedTransactions.track().second - val aliceTxMappings = aliceNode.storage.stateMachineRecordedTransactionMapping.track().second + // TODO: Had to put this temp val here to avoid compiler crash. Put back inside [databaseTransaction] if the compiler stops crashing. + val aliceMappingsStorage = aliceNode.storage.stateMachineRecordedTransactionMapping + val aliceTxMappings = databaseTransaction(aliceNode.database) { + aliceMappingsStorage.track().second + } val aliceSmId = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerId net.runNetwork()