Make transaction -> statemachine id mapping persistent.

This commit is contained in:
rick.parker 2016-10-14 14:49:46 +01:00
parent d7ca215f7d
commit 962fdba0f8
4 changed files with 72 additions and 2 deletions

View File

@ -12,6 +12,7 @@ data class StateMachineRunId private constructor(val uuid: UUID) {
companion object { companion object {
fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID()) fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID())
fun wrap(uuid: UUID): StateMachineRunId = StateMachineRunId(uuid)
} }
override fun toString(): String = "[$uuid]" override fun toString(): String = "[$uuid]"

View File

@ -451,7 +451,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
// Ensure all required keys exist. // Ensure all required keys exist.
obtainKeyPair(configuration.basedir, service.type.id + "-private-key", service.type.id + "-public", service.type.id) obtainKeyPair(configuration.basedir, service.type.id + "-private-key", service.type.id + "-public", service.type.id)
} }
val stateMachineTransactionMappingStorage = InMemoryStateMachineRecordedTransactionMappingStorage() val stateMachineTransactionMappingStorage = DBTransactionMappingStorage()
return Pair( return Pair(
constructStorageService(attachments, transactionStorage, stateMachineTransactionMappingStorage), constructStorageService(attachments, transactionStorage, stateMachineTransactionMappingStorage),
checkpointStorage checkpointStorage

View File

@ -0,0 +1,65 @@
package com.r3corda.node.services.persistence
import com.r3corda.core.ThreadBox
import com.r3corda.core.bufferUntilSubscribed
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import rx.Observable
import rx.subjects.PublishSubject
import javax.annotation.concurrent.ThreadSafe
/**
* Database storage of a txhash -> state machine id mapping.
*
* Mappings are added as transactions are persisted by [ServiceHub.recordTransaction], and never deleted. Used in the
* RPC API to correlate transaction creation with protocols.
*
*/
@ThreadSafe
class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorage {
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}transaction_mappings") {
val txId = secureHash("tx_id")
val stateMachineRunId = uuidString("state_machine_run_id")
}
private class TransactionMappingsMap : AbstractJDBCHashMap<SecureHash, StateMachineRunId, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): SecureHash = row[table.txId]
override fun valueFromRow(row: ResultRow): StateMachineRunId = StateMachineRunId.wrap(row[table.stateMachineRunId])
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, StateMachineRunId>, finalizables: MutableList<() -> Unit>) {
insert[table.txId] = entry.key
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, StateMachineRunId>, finalizables: MutableList<() -> Unit>) {
insert[table.stateMachineRunId] = entry.value.uuid
}
}
private val mutex = ThreadBox(object {
val stateMachineTransactionMap = TransactionMappingsMap()
val updates = PublishSubject.create<StateMachineTransactionMapping>()
})
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
mutex.locked {
stateMachineTransactionMap[transactionId] = stateMachineRunId
updates.onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
}
}
override fun track(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> {
mutex.locked {
return Pair(
stateMachineTransactionMap.map { StateMachineTransactionMapping(it.value, it.key) },
updates.bufferUntilSubscribed()
)
}
}
}

View File

@ -350,7 +350,11 @@ class TwoPartyTradeProtocolTests {
net.runNetwork() // Clear network map registration messages net.runNetwork() // Clear network map registration messages
val aliceTxStream = aliceNode.storage.validatedTransactions.track().second val aliceTxStream = aliceNode.storage.validatedTransactions.track().second
val aliceTxMappings = aliceNode.storage.stateMachineRecordedTransactionMapping.track().second // TODO: Had to put this temp val here to avoid compiler crash. Put back inside [databaseTransaction] if the compiler stops crashing.
val aliceMappingsStorage = aliceNode.storage.stateMachineRecordedTransactionMapping
val aliceTxMappings = databaseTransaction(aliceNode.database) {
aliceMappingsStorage.track().second
}
val aliceSmId = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerId val aliceSmId = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerId
net.runNetwork() net.runNetwork()