Persisting transactions to disk

This commit is contained in:
Shams Asari 2016-06-07 11:18:48 +01:00
parent f6e577f672
commit 2365f9bca5
15 changed files with 262 additions and 158 deletions

View File

@ -1,9 +1,7 @@
package com.r3corda.core.contracts package com.r3corda.core.contracts
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.contracts.WireTransaction
import com.r3corda.core.contracts.CommandData
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.TransactionStorage
import java.util.* import java.util.*
import java.util.concurrent.Callable import java.util.concurrent.Callable
@ -17,7 +15,7 @@ import java.util.concurrent.Callable
* *
* TODO: Write unit tests for this. * TODO: Write unit tests for this.
*/ */
class TransactionGraphSearch(val transactions: Map<SecureHash, SignedTransaction>, class TransactionGraphSearch(val transactions: TransactionStorage,
val startPoints: List<WireTransaction>) : Callable<List<WireTransaction>> { val startPoints: List<WireTransaction>) : Callable<List<WireTransaction>> {
class Query( class Query(
val withCommandOfType: Class<out CommandData>? = null val withCommandOfType: Class<out CommandData>? = null
@ -35,7 +33,7 @@ class TransactionGraphSearch(val transactions: Map<SecureHash, SignedTransaction
while (next.isNotEmpty()) { while (next.isNotEmpty()) {
val hash = next.removeAt(next.lastIndex) val hash = next.removeAt(next.lastIndex)
val tx = transactions[hash]?.tx ?: continue val tx = transactions.getTransaction(hash)?.tx ?: continue
if (q.matches(tx)) if (q.matches(tx))
results += tx results += tx

View File

@ -1,10 +1,8 @@
package com.r3corda.core.node package com.r3corda.core.node
import com.r3corda.core.contracts.* import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.* import com.r3corda.core.node.services.*
import com.r3corda.core.utilities.RecordingMap
import java.time.Clock import java.time.Clock
/** /**
@ -31,7 +29,7 @@ interface ServiceHub {
*/ */
fun verifyTransaction(ltx: LedgerTransaction) { fun verifyTransaction(ltx: LedgerTransaction) {
val dependencies = ltx.inputs.map { val dependencies = ltx.inputs.map {
storageService.validatedTransactions[it.txhash] ?: throw TransactionResolutionException(it.txhash) storageService.validatedTransactions.getTransaction(it.txhash) ?: throw TransactionResolutionException(it.txhash)
} }
val ltxns = dependencies.map { it.verifyToLedgerTransaction(identityService, storageService.attachments) } val ltxns = dependencies.map { it.verifyToLedgerTransaction(identityService, storageService.attachments) }
TransactionGroup(setOf(ltx), ltxns.toSet()).verify() TransactionGroup(setOf(ltx), ltxns.toSet()).verify()
@ -42,18 +40,11 @@ interface ServiceHub {
* sends them to the wallet for further processing. * sends them to the wallet for further processing.
* *
* TODO: Need to come up with a way for preventing transactions being written other than by this method. * TODO: Need to come up with a way for preventing transactions being written other than by this method.
* TODO: RecordingMap is test infrastructure. Refactor it away or find a way to ensure it's only used in tests.
* *
* @param txs The transactions to record * @param txs The transactions to record
* @param skipRecordingMap This is used in unit testing and can be ignored most of the time.
*/ */
fun recordTransactions(txs: List<SignedTransaction>, skipRecordingMap: Boolean = false) { fun recordTransactions(txs: List<SignedTransaction>) {
val txns: Map<SecureHash, SignedTransaction> = txs.groupBy { it.id }.mapValues { it.value.first() } txs.forEach { storageService.validatedTransactions.addTransaction(it) }
val txStorage = storageService.validatedTransactions
if (txStorage is RecordingMap && skipRecordingMap)
txStorage.putAllUnrecorded(txns)
else
txStorage.putAll(txns)
walletService.notifyAll(txs.map { it.tx }) walletService.notifyAll(txs.map { it.tx })
} }
@ -63,7 +54,7 @@ interface ServiceHub {
* @throws TransactionResolutionException if the [StateRef] points to a non-existent transaction * @throws TransactionResolutionException if the [StateRef] points to a non-existent transaction
*/ */
fun loadState(stateRef: StateRef): ContractState { fun loadState(stateRef: StateRef): ContractState {
val definingTx = storageService.validatedTransactions[stateRef.txhash] ?: throw TransactionResolutionException(stateRef.txhash) val definingTx = storageService.validatedTransactions.getTransaction(stateRef.txhash) ?: throw TransactionResolutionException(stateRef.txhash)
return definingTx.tx.outputs[stateRef.index] return definingTx.tx.outputs[stateRef.index]
} }
} }

View File

@ -3,7 +3,7 @@ package com.r3corda.core.node.services
import com.r3corda.core.contracts.* import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.AttachmentStorage import com.r3corda.core.node.services.TransactionStorage
import java.security.KeyPair import java.security.KeyPair
import java.security.PrivateKey import java.security.PrivateKey
import java.security.PublicKey import java.security.PublicKey
@ -123,7 +123,7 @@ interface StorageService {
* The signatures aren't technically needed after that point, but we keep them around so that we can relay * The signatures aren't technically needed after that point, but we keep them around so that we can relay
* the transaction data to other nodes that need it. * the transaction data to other nodes that need it.
*/ */
val validatedTransactions: MutableMap<SecureHash, SignedTransaction> val validatedTransactions: TransactionStorage
/** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */ /** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */
val attachments: AttachmentStorage val attachments: AttachmentStorage

View File

@ -0,0 +1,22 @@
package com.r3corda.core.node.services
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.SecureHash
/**
* Thread-safe storage of transactions.
*/
interface TransactionStorage {
/**
* Add a new transaction to the store. If the store already has a transaction with the same id it will be
* overwritten.
*/
fun addTransaction(transaction: SignedTransaction)
/**
* Return the transaction with the given [id], or null if no such transaction exists.
*/
fun getTransaction(id: SecureHash): SignedTransaction?
}

View File

@ -6,13 +6,8 @@ import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.crypto.sha256 import com.r3corda.core.crypto.sha256
import com.r3corda.core.node.services.AttachmentStorage import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.IdentityService
import com.r3corda.core.node.services.KeyManagementService
import com.r3corda.core.node.services.StorageService
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.RecordingMap
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.File import java.io.File
@ -82,33 +77,17 @@ class MockAttachmentStorage : AttachmentStorage {
} }
} }
class MockTransactionStorage : TransactionStorage {
private val txns = HashMap<SecureHash, SignedTransaction>()
override fun addTransaction(transaction: SignedTransaction) {
txns[transaction.id] = transaction
}
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]
}
@ThreadSafe @ThreadSafe
class MockStorageService(override val attachments: AttachmentStorage = MockAttachmentStorage(), class MockStorageService(override val attachments: AttachmentStorage = MockAttachmentStorage(),
override val validatedTransactions: TransactionStorage = MockTransactionStorage(),
override val myLegalIdentityKey: KeyPair = generateKeyPair(), override val myLegalIdentityKey: KeyPair = generateKeyPair(),
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public), override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public))
// This parameter is for unit tests that want to observe operation details. : SingletonSerializeAsToken(), StorageService
val recordingAs: (String) -> String = { tableName -> "" })
: SingletonSerializeAsToken(), StorageService {
protected val tables = HashMap<String, MutableMap<*, *>>()
private fun <K, V> getMapOriginal(tableName: String): MutableMap<K, V> {
synchronized(tables) {
@Suppress("UNCHECKED_CAST")
return tables.getOrPut(tableName) {
recorderWrap(Collections.synchronizedMap(HashMap<K, V>()), tableName)
} as MutableMap<K, V>
}
}
private fun <K, V> recorderWrap(map: MutableMap<K, V>, tableName: String): MutableMap<K, V> {
if (recordingAs(tableName) != "")
return RecordingMap(map, LoggerFactory.getLogger("recordingmap.${recordingAs(tableName)}"))
else
return map
}
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
get() = getMapOriginal("validated-transactions")
}

View File

@ -18,6 +18,6 @@ class FetchTransactionsProtocol(requests: Set<SecureHash>, otherSide: SingleMess
const val TOPIC = "platform.fetch.tx" const val TOPIC = "platform.fetch.tx"
} }
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.storageService.validatedTransactions[txid] override fun load(txid: SecureHash): SignedTransaction? = serviceHub.storageService.validatedTransactions.getTransaction(txid)
override val queryTopic: String = TOPIC override val queryTopic: String = TOPIC
} }

View File

@ -28,10 +28,7 @@ import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.InMemoryNetworkMapService
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.network.NodeRegistration import com.r3corda.node.services.network.NodeRegistration
import com.r3corda.node.services.persistence.DataVendingService import com.r3corda.node.services.persistence.*
import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
import com.r3corda.node.services.persistence.StorageServiceImpl
import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.NotaryService
@ -257,13 +254,17 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
protected open fun initialiseStorageService(dir: Path): Pair<StorageService, CheckpointStorage> { protected open fun initialiseStorageService(dir: Path): Pair<StorageService, CheckpointStorage> {
val attachments = makeAttachmentStorage(dir) val attachments = makeAttachmentStorage(dir)
val checkpointStorage = PerFileCheckpointStorage(dir.resolve("checkpoints")) val checkpointStorage = PerFileCheckpointStorage(dir.resolve("checkpoints"))
val transactionStorage = PerFileTransactionStorage(dir.resolve("transactions"))
_servicesThatAcceptUploads += attachments _servicesThatAcceptUploads += attachments
val (identity, keypair) = obtainKeyPair(dir) val (identity, keypair) = obtainKeyPair(dir)
return Pair(constructStorageService(attachments, keypair, identity), checkpointStorage) return Pair(constructStorageService(attachments, transactionStorage, keypair, identity),checkpointStorage)
} }
protected open fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party) = protected open fun constructStorageService(attachments: NodeAttachmentService,
StorageServiceImpl(attachments, keypair, identity) transactionStorage: TransactionStorage,
keypair: KeyPair,
identity: Party) =
StorageServiceImpl(attachments, transactionStorage, keypair, identity)
private fun obtainKeyPair(dir: Path): Pair<Party, KeyPair> { private fun obtainKeyPair(dir: Path): Pair<Party, KeyPair> {
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that // Load the private identity key, creating it if necessary. The identity key is a long term well known key that

View File

@ -36,7 +36,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
tx.signWith(seller.storage.myLegalIdentityKey) tx.signWith(seller.storage.myLegalIdentityKey)
tx.toSignedTransaction(true) tx.toSignedTransaction(true)
} }
seller.services.storageService.validatedTransactions[issuance.id] = issuance seller.services.storageService.validatedTransactions.addTransaction(issuance)
val sessionID = random63BitValue() val sessionID = random63BitValue()
val buyerProtocol = TwoPartyTradeProtocol.Buyer(seller.net.myAddress, notary.info.identity, val buyerProtocol = TwoPartyTradeProtocol.Buyer(seller.net.myAddress, notary.info.identity,

View File

@ -3,8 +3,8 @@ package com.r3corda.node.services.persistence
import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.StorageService import com.r3corda.core.node.services.StorageService
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.protocols.FetchAttachmentsProtocol import com.r3corda.protocols.FetchAttachmentsProtocol
import com.r3corda.protocols.FetchDataProtocol import com.r3corda.protocols.FetchDataProtocol
import com.r3corda.protocols.FetchTransactionsProtocol import com.r3corda.protocols.FetchTransactionsProtocol
@ -43,7 +43,7 @@ class DataVendingService(net: MessagingService, private val storage: StorageServ
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> { private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {
require(req.hashes.isNotEmpty()) require(req.hashes.isNotEmpty())
return req.hashes.map { return req.hashes.map {
val tx = storage.validatedTransactions[it] val tx = storage.validatedTransactions.getTransaction(it)
if (tx == null) if (tx == null)
logger.info("Got request for unknown tx $it") logger.info("Got request for unknown tx $it")
tx tx

View File

@ -0,0 +1,48 @@
package com.r3corda.node.services.persistence
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.TransactionStorage
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.ThreadSafe
/**
* File-based transaction storage, storing transactions per file.
*/
@ThreadSafe
class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
companion object {
private val logger = loggerFor<PerFileCheckpointStorage>()
private val fileExtension = ".transaction"
}
private val _transactions = ConcurrentHashMap<SecureHash, SignedTransaction>()
init {
logger.trace { "Initialising per file transaction storage on $storeDir" }
Files.createDirectories(storeDir)
Files.list(storeDir)
.filter { it.toString().toLowerCase().endsWith(fileExtension) }
.map { Files.readAllBytes(it).deserialize<SignedTransaction>() }
.forEach { _transactions[it.id] = it }
}
override fun addTransaction(transaction: SignedTransaction) {
val transactionFile = storeDir.resolve("${transaction.id.toString().toLowerCase()}$fileExtension")
transaction.serialize().writeToFile(transactionFile)
_transactions[transaction.id] = transaction
logger.trace { "Stored $transaction to $transactionFile" }
}
override fun getTransaction(id: SecureHash): SignedTransaction? = _transactions[id]
val transactions: Iterable<SignedTransaction> get() = _transactions.values.toList()
}

View File

@ -1,41 +1,14 @@
package com.r3corda.node.services.persistence package com.r3corda.node.services.persistence
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.AttachmentStorage import com.r3corda.core.node.services.AttachmentStorage
import com.r3corda.core.node.services.StorageService import com.r3corda.core.node.services.StorageService
import com.r3corda.core.node.services.TransactionStorage
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.RecordingMap
import org.slf4j.LoggerFactory
import java.security.KeyPair import java.security.KeyPair
import java.util.*
open class StorageServiceImpl(override val attachments: AttachmentStorage, open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val validatedTransactions: TransactionStorage,
override val myLegalIdentityKey: KeyPair, override val myLegalIdentityKey: KeyPair,
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public), override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public))
// This parameter is for unit tests that want to observe operation details. : SingletonSerializeAsToken(), StorageService
val recordingAs: (String) -> String = { tableName -> "" })
: SingletonSerializeAsToken(), StorageService {
protected val tables = HashMap<String, MutableMap<*, *>>()
private fun <K, V> getMapOriginal(tableName: String): MutableMap<K, V> {
synchronized(tables) {
@Suppress("UNCHECKED_CAST")
return tables.getOrPut(tableName) {
recorderWrap(Collections.synchronizedMap(HashMap<K, V>()), tableName)
} as MutableMap<K, V>
}
}
private fun <K, V> recorderWrap(map: MutableMap<K, V>, tableName: String): MutableMap<K, V> {
if (recordingAs(tableName) != "")
return RecordingMap(map, LoggerFactory.getLogger("recordingmap.${recordingAs(tableName)}"))
else
return map
}
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
get() = getMapOriginal("validated-transactions")
}

View File

@ -1,8 +1,8 @@
package com.r3corda.node.messaging package com.r3corda.node.messaging
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.contracts.cash.Cash
import com.r3corda.contracts.CommercialPaper import com.r3corda.contracts.CommercialPaper
import com.r3corda.contracts.cash.Cash
import com.r3corda.contracts.testing.CASH import com.r3corda.contracts.testing.CASH
import com.r3corda.contracts.testing.`issued by` import com.r3corda.contracts.testing.`issued by`
import com.r3corda.contracts.testing.`owned by` import com.r3corda.contracts.testing.`owned by`
@ -14,16 +14,17 @@ import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TransactionStorage
import com.r3corda.core.node.services.Wallet import com.r3corda.core.node.services.Wallet
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds import com.r3corda.core.seconds
import com.r3corda.core.testing.* import com.r3corda.core.testing.*
import com.r3corda.core.utilities.BriefLogFormatter import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.core.utilities.RecordingMap
import com.r3corda.node.internal.testing.MockNetwork import com.r3corda.node.internal.testing.MockNetwork
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.persistence.PerFileTransactionStorage
import com.r3corda.node.services.persistence.StorageServiceImpl import com.r3corda.node.services.persistence.StorageServiceImpl
import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.wallet.NodeWalletService import com.r3corda.node.services.wallet.NodeWalletService
@ -38,13 +39,12 @@ import java.io.ByteArrayOutputStream
import java.nio.file.Path import java.nio.file.Path
import java.security.KeyPair import java.security.KeyPair
import java.security.PublicKey import java.security.PublicKey
import java.util.Currency import java.util.*
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.jar.JarOutputStream import java.util.jar.JarOutputStream
import java.util.zip.ZipEntry import java.util.zip.ZipEntry
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertTrue import kotlin.test.assertTrue
/** /**
@ -186,15 +186,15 @@ class TwoPartyTradeProtocolTests {
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature. // OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
assertThat(bobNode.checkpointStorage.checkpoints).hasSize(1) assertThat(bobNode.checkpointStorage.checkpoints).hasSize(1)
// TODO: remove once validated transactions are persisted to disk val bobTransactionsBeforeCrash = (bobNode.storage.validatedTransactions as PerFileTransactionStorage).transactions
val recordedTransactions = bobNode.storage.validatedTransactions assertThat(bobTransactionsBeforeCrash).isNotEmpty()
// .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk. // .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk.
bobNode.stop() bobNode.stop()
// Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use. // Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use.
// She will wait around until Bob comes back. // She will wait around until Bob comes back.
assertNotNull(pumpAlice()) assertThat(pumpAlice()).isNotNull()
// ... bring the node back up ... the act of constructing the SMM will re-register the message handlers // ... bring the node back up ... the act of constructing the SMM will re-register the message handlers
// that Bob was waiting on before the reboot occurred. // that Bob was waiting on before the reboot occurred.
@ -205,9 +205,6 @@ class TwoPartyTradeProtocolTests {
} }
}, true, BOB.name, BOB_KEY) }, true, BOB.name, BOB_KEY)
// TODO: remove once validated transactions are persisted to disk
bobNode.storage.validatedTransactions.putAll(recordedTransactions)
// Find the future representing the result of this state machine again. // Find the future representing the result of this state machine again.
val bobFuture = bobNode.smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java).single().second val bobFuture = bobNode.smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java).single().second
@ -215,12 +212,15 @@ class TwoPartyTradeProtocolTests {
net.runNetwork() net.runNetwork()
// Bob is now finished and has the same transaction as Alice. // Bob is now finished and has the same transaction as Alice.
assertEquals(bobFuture.get(), aliceFuture.get()) assertThat(bobFuture.get()).isEqualTo(aliceFuture.get())
assertThat(bobNode.smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java)).isEmpty() assertThat(bobNode.smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java)).isEmpty()
assertThat(bobNode.checkpointStorage.checkpoints).isEmpty() assertThat(bobNode.checkpointStorage.checkpoints).isEmpty()
assertThat(aliceNode.checkpointStorage.checkpoints).isEmpty() assertThat(aliceNode.checkpointStorage.checkpoints).isEmpty()
val restoredBobTransactions = bobTransactionsBeforeCrash.filter { bobNode.storage.validatedTransactions.getTransaction(it.id) != null }
assertThat(restoredBobTransactions).containsAll(bobTransactionsBeforeCrash)
} }
} }
@ -233,9 +233,11 @@ class TwoPartyTradeProtocolTests {
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id, keyPair) { return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id, keyPair) {
// That constructs the storage service object in a customised way ... // That constructs the storage service object in a customised way ...
override fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party): StorageServiceImpl { override fun constructStorageService(attachments: NodeAttachmentService,
// To use RecordingMaps instead of ordinary HashMaps. transactionStorage: TransactionStorage,
return StorageServiceImpl(attachments, keypair, identity, { tableName -> name }) keypair: KeyPair,
identity: Party): StorageServiceImpl {
return StorageServiceImpl(attachments, RecordingTransactionStorage(transactionStorage), keypair, identity)
} }
} }
} }
@ -243,7 +245,7 @@ class TwoPartyTradeProtocolTests {
} }
@Test @Test
fun checkDependenciesOfSaleAssetAreResolved() { fun `check dependencies of sale asset are resolved`() {
transactionGroupFor<ContractState> { transactionGroupFor<ContractState> {
val notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY) val notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
val aliceNode = makeNodeWithTracking(notaryNode.info, ALICE.name, ALICE_KEY) val aliceNode = makeNodeWithTracking(notaryNode.info, ALICE.name, ALICE_KEY)
@ -289,22 +291,21 @@ class TwoPartyTradeProtocolTests {
net.runNetwork() net.runNetwork()
run { run {
val records = (bobNode.storage.validatedTransactions as RecordingMap).records val records = (bobNode.storage.validatedTransactions as RecordingTransactionStorage).records
// Check Bobs's database accesses as Bob's cash transactions are downloaded by Alice. // Check Bobs's database accesses as Bob's cash transactions are downloaded by Alice.
val expected = listOf( assertThat(records).containsExactly(
// Buyer Bob is told about Alice's commercial paper, but doesn't know it .. // Buyer Bob is told about Alice's commercial paper, but doesn't know it ..
RecordingMap.Get(alicesFakePaper[0].id), TxRecord.Get(alicesFakePaper[0].id),
// He asks and gets the tx, validates it, sees it's a self issue with no dependencies, stores. // He asks and gets the tx, validates it, sees it's a self issue with no dependencies, stores.
RecordingMap.Put(alicesFakePaper[0].id, alicesSignedTxns.values.first()), TxRecord.Add(alicesSignedTxns.values.first()),
// Alice gets Bob's proposed transaction and doesn't know his two cash states. She asks, Bob answers. // Alice gets Bob's proposed transaction and doesn't know his two cash states. She asks, Bob answers.
RecordingMap.Get(bobsFakeCash[1].id), TxRecord.Get(bobsFakeCash[1].id),
RecordingMap.Get(bobsFakeCash[2].id), TxRecord.Get(bobsFakeCash[2].id),
// Alice notices that Bob's cash txns depend on a third tx she also doesn't know. She asks, Bob answers. // Alice notices that Bob's cash txns depend on a third tx she also doesn't know. She asks, Bob answers.
RecordingMap.Get(bobsFakeCash[0].id), TxRecord.Get(bobsFakeCash[0].id),
// Bob wants to verify that the tx has been signed by the correct Notary, which requires looking up an input state // Bob wants to verify that the tx has been signed by the correct Notary, which requires looking up an input state
RecordingMap.Get(bobsFakeCash[1].id) TxRecord.Get(bobsFakeCash[1].id)
) )
assertEquals(expected, records)
// Bob has downloaded the attachment. // Bob has downloaded the attachment.
bobNode.storage.attachments.openAttachment(attachmentID)!!.openAsJAR().use { bobNode.storage.attachments.openAttachment(attachmentID)!!.openAsJAR().use {
@ -316,33 +317,32 @@ class TwoPartyTradeProtocolTests {
// And from Alice's perspective ... // And from Alice's perspective ...
run { run {
val records = (aliceNode.storage.validatedTransactions as RecordingMap).records val records = (aliceNode.storage.validatedTransactions as RecordingTransactionStorage).records
val expected = listOf( assertThat(records).containsExactly(
// Seller Alice sends her seller info to Bob, who wants to check the asset for sale. // Seller Alice sends her seller info to Bob, who wants to check the asset for sale.
// He requests, Alice looks up in her DB to send the tx to Bob // He requests, Alice looks up in her DB to send the tx to Bob
RecordingMap.Get(alicesFakePaper[0].id), TxRecord.Get(alicesFakePaper[0].id),
// Seller Alice gets a proposed tx which depends on Bob's two cash txns and her own tx. // Seller Alice gets a proposed tx which depends on Bob's two cash txns and her own tx.
RecordingMap.Get(bobsFakeCash[1].id), TxRecord.Get(bobsFakeCash[1].id),
RecordingMap.Get(bobsFakeCash[2].id), TxRecord.Get(bobsFakeCash[2].id),
RecordingMap.Get(alicesFakePaper[0].id), TxRecord.Get(alicesFakePaper[0].id),
// Alice notices that Bob's cash txns depend on a third tx she also doesn't know. // Alice notices that Bob's cash txns depend on a third tx she also doesn't know.
RecordingMap.Get(bobsFakeCash[0].id), TxRecord.Get(bobsFakeCash[0].id),
// Bob answers with the transactions that are now all verifiable, as Alice bottomed out. // Bob answers with the transactions that are now all verifiable, as Alice bottomed out.
// Bob's transactions are valid, so she commits to the database // Bob's transactions are valid, so she commits to the database
RecordingMap.Put(bobsFakeCash[1].id, bobsSignedTxns[bobsFakeCash[1].id]), TxRecord.Add(bobsSignedTxns[bobsFakeCash[1].id]!!),
RecordingMap.Put(bobsFakeCash[2].id, bobsSignedTxns[bobsFakeCash[2].id]), TxRecord.Add(bobsSignedTxns[bobsFakeCash[2].id]!!),
RecordingMap.Put(bobsFakeCash[0].id, bobsSignedTxns[bobsFakeCash[0].id]), TxRecord.Add(bobsSignedTxns[bobsFakeCash[0].id]!!),
// Now she verifies the transaction is contract-valid (not signature valid) which means // Now she verifies the transaction is contract-valid (not signature valid) which means
// looking up the states again. // looking up the states again.
RecordingMap.Get(bobsFakeCash[1].id), TxRecord.Get(bobsFakeCash[1].id),
RecordingMap.Get(bobsFakeCash[2].id), TxRecord.Get(bobsFakeCash[2].id),
RecordingMap.Get(alicesFakePaper[0].id), TxRecord.Get(alicesFakePaper[0].id),
// Alice needs to look up the input states to find out which Notary they point to // Alice needs to look up the input states to find out which Notary they point to
RecordingMap.Get(bobsFakeCash[1].id), TxRecord.Get(bobsFakeCash[1].id),
RecordingMap.Get(bobsFakeCash[2].id), TxRecord.Get(bobsFakeCash[2].id),
RecordingMap.Get(alicesFakePaper[0].id) TxRecord.Get(alicesFakePaper[0].id)
) )
assertEquals(expected, records)
} }
} }
} }
@ -415,7 +415,11 @@ class TwoPartyTradeProtocolTests {
services: ServiceHub, services: ServiceHub,
vararg extraKeys: KeyPair): Map<SecureHash, SignedTransaction> { vararg extraKeys: KeyPair): Map<SecureHash, SignedTransaction> {
val signed: List<SignedTransaction> = signAll(wtxToSign, *extraKeys) val signed: List<SignedTransaction> = signAll(wtxToSign, *extraKeys)
services.recordTransactions(signed, skipRecordingMap = true) services.recordTransactions(signed)
val validatedTransactions = services.storageService.validatedTransactions
if (validatedTransactions is RecordingTransactionStorage) {
validatedTransactions.records.clear()
}
return signed.associateBy { it.id } return signed.associateBy { it.id }
} }
@ -465,4 +469,27 @@ class TwoPartyTradeProtocolTests {
val wallet = WalletImpl(listOf<StateAndRef<Cash.State>>(lookup("alice's paper"))) val wallet = WalletImpl(listOf<StateAndRef<Cash.State>>(lookup("alice's paper")))
return Pair(wallet, listOf(ap)) return Pair(wallet, listOf(ap))
} }
class RecordingTransactionStorage(val delegate: TransactionStorage) : TransactionStorage {
val records = Collections.synchronizedList(ArrayList<TxRecord>())
override fun addTransaction(transaction: SignedTransaction) {
records.add(TxRecord.Add(transaction))
delegate.addTransaction(transaction)
}
override fun getTransaction(id: SecureHash): SignedTransaction? {
records.add(TxRecord.Get(id))
return delegate.getTransaction(id)
}
}
interface TxRecord {
data class Add(val transaction: SignedTransaction) : TxRecord
data class Get(val id: SecureHash): TxRecord
}
} }

View File

@ -6,8 +6,6 @@ import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.testing.MockStorageService import com.r3corda.core.node.services.testing.MockStorageService
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
import com.r3corda.node.serialization.NodeClock import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.Checkpoint
import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.MonitoringService import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache import com.r3corda.node.services.network.MockNetworkMapCache
@ -15,23 +13,6 @@ import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.DataVendingService import com.r3corda.node.services.persistence.DataVendingService
import com.r3corda.node.services.wallet.NodeWalletService import com.r3corda.node.services.wallet.NodeWalletService
import java.time.Clock import java.time.Clock
import java.util.concurrent.ConcurrentLinkedQueue
class MockCheckpointStorage : CheckpointStorage {
private val _checkpoints = ConcurrentLinkedQueue<Checkpoint>()
override val checkpoints: Iterable<Checkpoint>
get() = _checkpoints.toList()
override fun addCheckpoint(checkpoint: Checkpoint) {
_checkpoints.add(checkpoint)
}
override fun removeCheckpoint(checkpoint: Checkpoint) {
require(_checkpoints.remove(checkpoint))
}
}
class MockServices( class MockServices(
customWallet: WalletService? = null, customWallet: WalletService? = null,

View File

@ -1,11 +1,10 @@
package com.r3corda.node.services package com.r3corda.node.services.persistence
import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs import com.google.common.jimfs.Jimfs
import com.google.common.primitives.Ints import com.google.common.primitives.Ints
import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.node.services.api.Checkpoint import com.r3corda.node.services.api.Checkpoint
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After import org.junit.After

View File

@ -0,0 +1,85 @@
package com.r3corda.node.services.persistence
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.google.common.primitives.Ints
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.NullPublicKey
import com.r3corda.core.serialization.SerializedBytes
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.nio.file.Files
class PerFileTransactionStorageTests {
val fileSystem = Jimfs.newFileSystem(unix())
val storeDir = fileSystem.getPath("store")
lateinit var transactionStorage: PerFileTransactionStorage
@Before
fun setUp() {
newTransactionStorage()
}
@After
fun cleanUp() {
fileSystem.close()
}
@Test
fun `empty store`() {
assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull()
assertThat(transactionStorage.transactions).isEmpty()
newTransactionStorage()
assertThat(transactionStorage.transactions).isEmpty()
}
@Test
fun `one transaction`() {
val transaction = newTransaction()
transactionStorage.addTransaction(transaction)
assertTransactionIsRetrievable(transaction)
assertThat(transactionStorage.transactions).containsExactly(transaction)
newTransactionStorage()
assertTransactionIsRetrievable(transaction)
assertThat(transactionStorage.transactions).containsExactly(transaction)
}
@Test
fun `two transactions across restart`() {
val firstTransaction = newTransaction()
val secondTransaction = newTransaction()
transactionStorage.addTransaction(firstTransaction)
newTransactionStorage()
transactionStorage.addTransaction(secondTransaction)
assertTransactionIsRetrievable(firstTransaction)
assertTransactionIsRetrievable(secondTransaction)
assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction)
}
@Test
fun `non-transaction files are ignored`() {
val transactions = newTransaction()
transactionStorage.addTransaction(transactions)
Files.write(storeDir.resolve("random-non-tx-file"), "this is not a transaction!!".toByteArray())
newTransactionStorage()
assertThat(transactionStorage.transactions).containsExactly(transactions)
}
private fun newTransactionStorage() {
transactionStorage = PerFileTransactionStorage(storeDir)
}
private fun assertTransactionIsRetrievable(transaction: SignedTransaction) {
assertThat(transactionStorage.getTransaction(transaction.id)).isEqualTo(transaction)
}
private var txCount = 0
private fun newTransaction() = SignedTransaction(
SerializedBytes(Ints.toByteArray(++txCount)),
listOf(DigitalSignature.WithKey(NullPublicKey, ByteArray(1))))
}