diff --git a/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt b/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt index 244cabf4a6..d43e69ebec 100644 --- a/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt +++ b/core/src/main/kotlin/com/r3corda/core/contracts/TransactionGraphSearch.kt @@ -1,9 +1,7 @@ package com.r3corda.core.contracts -import com.r3corda.core.contracts.SignedTransaction -import com.r3corda.core.contracts.WireTransaction -import com.r3corda.core.contracts.CommandData import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.services.TransactionStorage import java.util.* import java.util.concurrent.Callable @@ -17,7 +15,7 @@ import java.util.concurrent.Callable * * TODO: Write unit tests for this. */ -class TransactionGraphSearch(val transactions: Map, +class TransactionGraphSearch(val transactions: TransactionStorage, val startPoints: List) : Callable> { class Query( val withCommandOfType: Class? = null @@ -35,7 +33,7 @@ class TransactionGraphSearch(val transactions: Map, skipRecordingMap: Boolean = false) { - val txns: Map = txs.groupBy { it.id }.mapValues { it.value.first() } - val txStorage = storageService.validatedTransactions - if (txStorage is RecordingMap && skipRecordingMap) - txStorage.putAllUnrecorded(txns) - else - txStorage.putAll(txns) + fun recordTransactions(txs: List) { + txs.forEach { storageService.validatedTransactions.addTransaction(it) } walletService.notifyAll(txs.map { it.tx }) } @@ -63,7 +54,7 @@ interface ServiceHub { * @throws TransactionResolutionException if the [StateRef] points to a non-existent transaction */ fun loadState(stateRef: StateRef): ContractState { - val definingTx = storageService.validatedTransactions[stateRef.txhash] ?: throw TransactionResolutionException(stateRef.txhash) + val definingTx = storageService.validatedTransactions.getTransaction(stateRef.txhash) ?: throw TransactionResolutionException(stateRef.txhash) return definingTx.tx.outputs[stateRef.index] } } \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt index 0535668b86..34a379b565 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt @@ -3,7 +3,7 @@ package com.r3corda.core.node.services import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash -import com.r3corda.core.node.services.AttachmentStorage +import com.r3corda.core.node.services.TransactionStorage import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey @@ -123,7 +123,7 @@ interface StorageService { * The signatures aren't technically needed after that point, but we keep them around so that we can relay * the transaction data to other nodes that need it. */ - val validatedTransactions: MutableMap + val validatedTransactions: TransactionStorage /** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */ val attachments: AttachmentStorage diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt b/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt new file mode 100644 index 0000000000..80b61f33fc --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt @@ -0,0 +1,22 @@ +package com.r3corda.core.node.services + +import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.crypto.SecureHash + +/** + * Thread-safe storage of transactions. + */ +interface TransactionStorage { + + /** + * Add a new transaction to the store. If the store already has a transaction with the same id it will be + * overwritten. + */ + fun addTransaction(transaction: SignedTransaction) + + /** + * Return the transaction with the given [id], or null if no such transaction exists. + */ + fun getTransaction(id: SecureHash): SignedTransaction? + +} diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt index 54647fca4a..db90aa5eef 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt @@ -6,13 +6,8 @@ import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.crypto.sha256 -import com.r3corda.core.node.services.AttachmentStorage -import com.r3corda.core.node.services.IdentityService -import com.r3corda.core.node.services.KeyManagementService -import com.r3corda.core.node.services.StorageService +import com.r3corda.core.node.services.* import com.r3corda.core.serialization.SingletonSerializeAsToken -import com.r3corda.core.utilities.RecordingMap -import org.slf4j.LoggerFactory import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.io.File @@ -82,33 +77,17 @@ class MockAttachmentStorage : AttachmentStorage { } } +class MockTransactionStorage : TransactionStorage { + private val txns = HashMap() + override fun addTransaction(transaction: SignedTransaction) { + txns[transaction.id] = transaction + } + override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id] +} @ThreadSafe class MockStorageService(override val attachments: AttachmentStorage = MockAttachmentStorage(), + override val validatedTransactions: TransactionStorage = MockTransactionStorage(), override val myLegalIdentityKey: KeyPair = generateKeyPair(), - override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public), -// This parameter is for unit tests that want to observe operation details. - val recordingAs: (String) -> String = { tableName -> "" }) -: SingletonSerializeAsToken(), StorageService { - protected val tables = HashMap>() - - private fun getMapOriginal(tableName: String): MutableMap { - synchronized(tables) { - @Suppress("UNCHECKED_CAST") - return tables.getOrPut(tableName) { - recorderWrap(Collections.synchronizedMap(HashMap()), tableName) - } as MutableMap - } - } - - private fun recorderWrap(map: MutableMap, tableName: String): MutableMap { - if (recordingAs(tableName) != "") - return RecordingMap(map, LoggerFactory.getLogger("recordingmap.${recordingAs(tableName)}")) - else - return map - } - - override val validatedTransactions: MutableMap - get() = getMapOriginal("validated-transactions") - -} + override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) +: SingletonSerializeAsToken(), StorageService diff --git a/core/src/main/kotlin/com/r3corda/protocols/FetchTransactionsProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/FetchTransactionsProtocol.kt index 864aeb7562..032fa9b583 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/FetchTransactionsProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/FetchTransactionsProtocol.kt @@ -18,6 +18,6 @@ class FetchTransactionsProtocol(requests: Set, otherSide: SingleMess const val TOPIC = "platform.fetch.tx" } - override fun load(txid: SecureHash): SignedTransaction? = serviceHub.storageService.validatedTransactions[txid] + override fun load(txid: SecureHash): SignedTransaction? = serviceHub.storageService.validatedTransactions.getTransaction(txid) override val queryTopic: String = TOPIC } \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 364a6a8966..bc00eb2b94 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -28,10 +28,7 @@ import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NodeRegistration -import com.r3corda.node.services.persistence.DataVendingService -import com.r3corda.node.services.persistence.NodeAttachmentService -import com.r3corda.node.services.persistence.PerFileCheckpointStorage -import com.r3corda.node.services.persistence.StorageServiceImpl +import com.r3corda.node.services.persistence.* import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.transactions.InMemoryUniquenessProvider import com.r3corda.node.services.transactions.NotaryService @@ -257,13 +254,17 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, protected open fun initialiseStorageService(dir: Path): Pair { val attachments = makeAttachmentStorage(dir) val checkpointStorage = PerFileCheckpointStorage(dir.resolve("checkpoints")) + val transactionStorage = PerFileTransactionStorage(dir.resolve("transactions")) _servicesThatAcceptUploads += attachments val (identity, keypair) = obtainKeyPair(dir) - return Pair(constructStorageService(attachments, keypair, identity), checkpointStorage) + return Pair(constructStorageService(attachments, transactionStorage, keypair, identity),checkpointStorage) } - protected open fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party) = - StorageServiceImpl(attachments, keypair, identity) + protected open fun constructStorageService(attachments: NodeAttachmentService, + transactionStorage: TransactionStorage, + keypair: KeyPair, + identity: Party) = + StorageServiceImpl(attachments, transactionStorage, keypair, identity) private fun obtainKeyPair(dir: Path): Pair { // Load the private identity key, creating it if necessary. The identity key is a long term well known key that @@ -307,4 +308,4 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, } return NodeAttachmentService(attachmentsDir, services.monitoringService.metrics) } -} \ No newline at end of file +} diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt index c8b209db25..6c5a8c6403 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt @@ -36,7 +36,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo tx.signWith(seller.storage.myLegalIdentityKey) tx.toSignedTransaction(true) } - seller.services.storageService.validatedTransactions[issuance.id] = issuance + seller.services.storageService.validatedTransactions.addTransaction(issuance) val sessionID = random63BitValue() val buyerProtocol = TwoPartyTradeProtocol.Buyer(seller.net.myAddress, notary.info.identity, diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt index 47265b5994..43a09867ab 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt @@ -3,8 +3,8 @@ package com.r3corda.node.services.persistence import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.services.StorageService -import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.core.utilities.loggerFor +import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.protocols.FetchAttachmentsProtocol import com.r3corda.protocols.FetchDataProtocol import com.r3corda.protocols.FetchTransactionsProtocol @@ -43,7 +43,7 @@ class DataVendingService(net: MessagingService, private val storage: StorageServ private fun handleTXRequest(req: FetchDataProtocol.Request): List { require(req.hashes.isNotEmpty()) return req.hashes.map { - val tx = storage.validatedTransactions[it] + val tx = storage.validatedTransactions.getTransaction(it) if (tx == null) logger.info("Got request for unknown tx $it") tx diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt new file mode 100644 index 0000000000..6e063c5181 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt @@ -0,0 +1,48 @@ +package com.r3corda.node.services.persistence + +import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.services.TransactionStorage +import com.r3corda.core.serialization.deserialize +import com.r3corda.core.serialization.serialize +import com.r3corda.core.utilities.loggerFor +import com.r3corda.core.utilities.trace +import java.nio.file.Files +import java.nio.file.Path +import java.util.concurrent.ConcurrentHashMap +import javax.annotation.concurrent.ThreadSafe + +/** + * File-based transaction storage, storing transactions per file. + */ +@ThreadSafe +class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage { + + companion object { + private val logger = loggerFor() + private val fileExtension = ".transaction" + } + + private val _transactions = ConcurrentHashMap() + + init { + logger.trace { "Initialising per file transaction storage on $storeDir" } + Files.createDirectories(storeDir) + Files.list(storeDir) + .filter { it.toString().toLowerCase().endsWith(fileExtension) } + .map { Files.readAllBytes(it).deserialize() } + .forEach { _transactions[it.id] = it } + } + + override fun addTransaction(transaction: SignedTransaction) { + val transactionFile = storeDir.resolve("${transaction.id.toString().toLowerCase()}$fileExtension") + transaction.serialize().writeToFile(transactionFile) + _transactions[transaction.id] = transaction + logger.trace { "Stored $transaction to $transactionFile" } + } + + override fun getTransaction(id: SecureHash): SignedTransaction? = _transactions[id] + + val transactions: Iterable get() = _transactions.values.toList() + +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt index 86a485773e..a6b9c0f906 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt @@ -1,41 +1,14 @@ package com.r3corda.node.services.persistence -import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.crypto.Party -import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.services.AttachmentStorage import com.r3corda.core.node.services.StorageService +import com.r3corda.core.node.services.TransactionStorage import com.r3corda.core.serialization.SingletonSerializeAsToken -import com.r3corda.core.utilities.RecordingMap -import org.slf4j.LoggerFactory import java.security.KeyPair -import java.util.* open class StorageServiceImpl(override val attachments: AttachmentStorage, + override val validatedTransactions: TransactionStorage, override val myLegalIdentityKey: KeyPair, - override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public), - // This parameter is for unit tests that want to observe operation details. - val recordingAs: (String) -> String = { tableName -> "" }) -: SingletonSerializeAsToken(), StorageService { - protected val tables = HashMap>() - - private fun getMapOriginal(tableName: String): MutableMap { - synchronized(tables) { - @Suppress("UNCHECKED_CAST") - return tables.getOrPut(tableName) { - recorderWrap(Collections.synchronizedMap(HashMap()), tableName) - } as MutableMap - } - } - - private fun recorderWrap(map: MutableMap, tableName: String): MutableMap { - if (recordingAs(tableName) != "") - return RecordingMap(map, LoggerFactory.getLogger("recordingmap.${recordingAs(tableName)}")) - else - return map - } - - override val validatedTransactions: MutableMap - get() = getMapOriginal("validated-transactions") - -} \ No newline at end of file + override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) +: SingletonSerializeAsToken(), StorageService diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt index 3b9e41d300..3a45fbe148 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -1,8 +1,8 @@ package com.r3corda.node.messaging import com.google.common.util.concurrent.ListenableFuture -import com.r3corda.contracts.cash.Cash import com.r3corda.contracts.CommercialPaper +import com.r3corda.contracts.cash.Cash import com.r3corda.contracts.testing.CASH import com.r3corda.contracts.testing.`issued by` import com.r3corda.contracts.testing.`owned by` @@ -14,16 +14,17 @@ import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.ServiceType +import com.r3corda.core.node.services.TransactionStorage import com.r3corda.core.node.services.Wallet import com.r3corda.core.random63BitValue import com.r3corda.core.seconds import com.r3corda.core.testing.* import com.r3corda.core.utilities.BriefLogFormatter -import com.r3corda.core.utilities.RecordingMap import com.r3corda.node.internal.testing.MockNetwork import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.services.persistence.NodeAttachmentService +import com.r3corda.node.services.persistence.PerFileTransactionStorage import com.r3corda.node.services.persistence.StorageServiceImpl import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.wallet.NodeWalletService @@ -38,13 +39,12 @@ import java.io.ByteArrayOutputStream import java.nio.file.Path import java.security.KeyPair import java.security.PublicKey -import java.util.Currency +import java.util.* import java.util.concurrent.ExecutionException import java.util.jar.JarOutputStream import java.util.zip.ZipEntry import kotlin.test.assertEquals import kotlin.test.assertFailsWith -import kotlin.test.assertNotNull import kotlin.test.assertTrue /** @@ -186,15 +186,15 @@ class TwoPartyTradeProtocolTests { // OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature. assertThat(bobNode.checkpointStorage.checkpoints).hasSize(1) - // TODO: remove once validated transactions are persisted to disk - val recordedTransactions = bobNode.storage.validatedTransactions + val bobTransactionsBeforeCrash = (bobNode.storage.validatedTransactions as PerFileTransactionStorage).transactions + assertThat(bobTransactionsBeforeCrash).isNotEmpty() // .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk. bobNode.stop() // Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use. // She will wait around until Bob comes back. - assertNotNull(pumpAlice()) + assertThat(pumpAlice()).isNotNull() // ... bring the node back up ... the act of constructing the SMM will re-register the message handlers // that Bob was waiting on before the reboot occurred. @@ -205,9 +205,6 @@ class TwoPartyTradeProtocolTests { } }, true, BOB.name, BOB_KEY) - // TODO: remove once validated transactions are persisted to disk - bobNode.storage.validatedTransactions.putAll(recordedTransactions) - // Find the future representing the result of this state machine again. val bobFuture = bobNode.smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java).single().second @@ -215,12 +212,15 @@ class TwoPartyTradeProtocolTests { net.runNetwork() // Bob is now finished and has the same transaction as Alice. - assertEquals(bobFuture.get(), aliceFuture.get()) + assertThat(bobFuture.get()).isEqualTo(aliceFuture.get()) assertThat(bobNode.smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java)).isEmpty() assertThat(bobNode.checkpointStorage.checkpoints).isEmpty() assertThat(aliceNode.checkpointStorage.checkpoints).isEmpty() + + val restoredBobTransactions = bobTransactionsBeforeCrash.filter { bobNode.storage.validatedTransactions.getTransaction(it.id) != null } + assertThat(restoredBobTransactions).containsAll(bobTransactionsBeforeCrash) } } @@ -233,9 +233,11 @@ class TwoPartyTradeProtocolTests { advertisedServices: Set, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id, keyPair) { // That constructs the storage service object in a customised way ... - override fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party): StorageServiceImpl { - // To use RecordingMaps instead of ordinary HashMaps. - return StorageServiceImpl(attachments, keypair, identity, { tableName -> name }) + override fun constructStorageService(attachments: NodeAttachmentService, + transactionStorage: TransactionStorage, + keypair: KeyPair, + identity: Party): StorageServiceImpl { + return StorageServiceImpl(attachments, RecordingTransactionStorage(transactionStorage), keypair, identity) } } } @@ -243,7 +245,7 @@ class TwoPartyTradeProtocolTests { } @Test - fun checkDependenciesOfSaleAssetAreResolved() { + fun `check dependencies of sale asset are resolved`() { transactionGroupFor { val notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY) val aliceNode = makeNodeWithTracking(notaryNode.info, ALICE.name, ALICE_KEY) @@ -289,22 +291,21 @@ class TwoPartyTradeProtocolTests { net.runNetwork() run { - val records = (bobNode.storage.validatedTransactions as RecordingMap).records + val records = (bobNode.storage.validatedTransactions as RecordingTransactionStorage).records // Check Bobs's database accesses as Bob's cash transactions are downloaded by Alice. - val expected = listOf( + assertThat(records).containsExactly( // Buyer Bob is told about Alice's commercial paper, but doesn't know it .. - RecordingMap.Get(alicesFakePaper[0].id), + TxRecord.Get(alicesFakePaper[0].id), // He asks and gets the tx, validates it, sees it's a self issue with no dependencies, stores. - RecordingMap.Put(alicesFakePaper[0].id, alicesSignedTxns.values.first()), + TxRecord.Add(alicesSignedTxns.values.first()), // Alice gets Bob's proposed transaction and doesn't know his two cash states. She asks, Bob answers. - RecordingMap.Get(bobsFakeCash[1].id), - RecordingMap.Get(bobsFakeCash[2].id), + TxRecord.Get(bobsFakeCash[1].id), + TxRecord.Get(bobsFakeCash[2].id), // Alice notices that Bob's cash txns depend on a third tx she also doesn't know. She asks, Bob answers. - RecordingMap.Get(bobsFakeCash[0].id), + TxRecord.Get(bobsFakeCash[0].id), // Bob wants to verify that the tx has been signed by the correct Notary, which requires looking up an input state - RecordingMap.Get(bobsFakeCash[1].id) + TxRecord.Get(bobsFakeCash[1].id) ) - assertEquals(expected, records) // Bob has downloaded the attachment. bobNode.storage.attachments.openAttachment(attachmentID)!!.openAsJAR().use { @@ -316,33 +317,32 @@ class TwoPartyTradeProtocolTests { // And from Alice's perspective ... run { - val records = (aliceNode.storage.validatedTransactions as RecordingMap).records - val expected = listOf( + val records = (aliceNode.storage.validatedTransactions as RecordingTransactionStorage).records + assertThat(records).containsExactly( // Seller Alice sends her seller info to Bob, who wants to check the asset for sale. // He requests, Alice looks up in her DB to send the tx to Bob - RecordingMap.Get(alicesFakePaper[0].id), + TxRecord.Get(alicesFakePaper[0].id), // Seller Alice gets a proposed tx which depends on Bob's two cash txns and her own tx. - RecordingMap.Get(bobsFakeCash[1].id), - RecordingMap.Get(bobsFakeCash[2].id), - RecordingMap.Get(alicesFakePaper[0].id), + TxRecord.Get(bobsFakeCash[1].id), + TxRecord.Get(bobsFakeCash[2].id), + TxRecord.Get(alicesFakePaper[0].id), // Alice notices that Bob's cash txns depend on a third tx she also doesn't know. - RecordingMap.Get(bobsFakeCash[0].id), + TxRecord.Get(bobsFakeCash[0].id), // Bob answers with the transactions that are now all verifiable, as Alice bottomed out. // Bob's transactions are valid, so she commits to the database - RecordingMap.Put(bobsFakeCash[1].id, bobsSignedTxns[bobsFakeCash[1].id]), - RecordingMap.Put(bobsFakeCash[2].id, bobsSignedTxns[bobsFakeCash[2].id]), - RecordingMap.Put(bobsFakeCash[0].id, bobsSignedTxns[bobsFakeCash[0].id]), + TxRecord.Add(bobsSignedTxns[bobsFakeCash[1].id]!!), + TxRecord.Add(bobsSignedTxns[bobsFakeCash[2].id]!!), + TxRecord.Add(bobsSignedTxns[bobsFakeCash[0].id]!!), // Now she verifies the transaction is contract-valid (not signature valid) which means // looking up the states again. - RecordingMap.Get(bobsFakeCash[1].id), - RecordingMap.Get(bobsFakeCash[2].id), - RecordingMap.Get(alicesFakePaper[0].id), + TxRecord.Get(bobsFakeCash[1].id), + TxRecord.Get(bobsFakeCash[2].id), + TxRecord.Get(alicesFakePaper[0].id), // Alice needs to look up the input states to find out which Notary they point to - RecordingMap.Get(bobsFakeCash[1].id), - RecordingMap.Get(bobsFakeCash[2].id), - RecordingMap.Get(alicesFakePaper[0].id) + TxRecord.Get(bobsFakeCash[1].id), + TxRecord.Get(bobsFakeCash[2].id), + TxRecord.Get(alicesFakePaper[0].id) ) - assertEquals(expected, records) } } } @@ -415,7 +415,11 @@ class TwoPartyTradeProtocolTests { services: ServiceHub, vararg extraKeys: KeyPair): Map { val signed: List = signAll(wtxToSign, *extraKeys) - services.recordTransactions(signed, skipRecordingMap = true) + services.recordTransactions(signed) + val validatedTransactions = services.storageService.validatedTransactions + if (validatedTransactions is RecordingTransactionStorage) { + validatedTransactions.records.clear() + } return signed.associateBy { it.id } } @@ -465,4 +469,27 @@ class TwoPartyTradeProtocolTests { val wallet = WalletImpl(listOf>(lookup("alice's paper"))) return Pair(wallet, listOf(ap)) } + + + class RecordingTransactionStorage(val delegate: TransactionStorage) : TransactionStorage { + + val records = Collections.synchronizedList(ArrayList()) + + override fun addTransaction(transaction: SignedTransaction) { + records.add(TxRecord.Add(transaction)) + delegate.addTransaction(transaction) + } + + override fun getTransaction(id: SecureHash): SignedTransaction? { + records.add(TxRecord.Get(id)) + return delegate.getTransaction(id) + } + } + + interface TxRecord { + data class Add(val transaction: SignedTransaction) : TxRecord + data class Get(val id: SecureHash): TxRecord + } + + } diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt index b4331bf88a..9fb51f1065 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt @@ -6,8 +6,6 @@ import com.r3corda.core.node.services.* import com.r3corda.core.node.services.testing.MockStorageService import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE import com.r3corda.node.serialization.NodeClock -import com.r3corda.node.services.api.Checkpoint -import com.r3corda.node.services.api.CheckpointStorage import com.r3corda.node.services.api.MonitoringService import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.network.MockNetworkMapCache @@ -15,23 +13,6 @@ import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.persistence.DataVendingService import com.r3corda.node.services.wallet.NodeWalletService import java.time.Clock -import java.util.concurrent.ConcurrentLinkedQueue - -class MockCheckpointStorage : CheckpointStorage { - - private val _checkpoints = ConcurrentLinkedQueue() - override val checkpoints: Iterable - get() = _checkpoints.toList() - - override fun addCheckpoint(checkpoint: Checkpoint) { - _checkpoints.add(checkpoint) - } - - override fun removeCheckpoint(checkpoint: Checkpoint) { - require(_checkpoints.remove(checkpoint)) - } -} - class MockServices( customWallet: WalletService? = null, diff --git a/node/src/test/kotlin/com/r3corda/node/services/PerFileCheckpointStorageTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorageTests.kt similarity index 97% rename from node/src/test/kotlin/com/r3corda/node/services/PerFileCheckpointStorageTests.kt rename to node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorageTests.kt index c40302af59..93149ba6f5 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/PerFileCheckpointStorageTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorageTests.kt @@ -1,11 +1,10 @@ -package com.r3corda.node.services +package com.r3corda.node.services.persistence import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs import com.google.common.primitives.Ints import com.r3corda.core.serialization.SerializedBytes import com.r3corda.node.services.api.Checkpoint -import com.r3corda.node.services.persistence.PerFileCheckpointStorage import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.After diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt new file mode 100644 index 0000000000..d7a347c69a --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt @@ -0,0 +1,85 @@ +package com.r3corda.node.services.persistence + +import com.google.common.jimfs.Configuration.unix +import com.google.common.jimfs.Jimfs +import com.google.common.primitives.Ints +import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.crypto.DigitalSignature +import com.r3corda.core.crypto.NullPublicKey +import com.r3corda.core.serialization.SerializedBytes +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.nio.file.Files + +class PerFileTransactionStorageTests { + + val fileSystem = Jimfs.newFileSystem(unix()) + val storeDir = fileSystem.getPath("store") + lateinit var transactionStorage: PerFileTransactionStorage + + @Before + fun setUp() { + newTransactionStorage() + } + + @After + fun cleanUp() { + fileSystem.close() + } + + @Test + fun `empty store`() { + assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull() + assertThat(transactionStorage.transactions).isEmpty() + newTransactionStorage() + assertThat(transactionStorage.transactions).isEmpty() + } + + @Test + fun `one transaction`() { + val transaction = newTransaction() + transactionStorage.addTransaction(transaction) + assertTransactionIsRetrievable(transaction) + assertThat(transactionStorage.transactions).containsExactly(transaction) + newTransactionStorage() + assertTransactionIsRetrievable(transaction) + assertThat(transactionStorage.transactions).containsExactly(transaction) + } + + @Test + fun `two transactions across restart`() { + val firstTransaction = newTransaction() + val secondTransaction = newTransaction() + transactionStorage.addTransaction(firstTransaction) + newTransactionStorage() + transactionStorage.addTransaction(secondTransaction) + assertTransactionIsRetrievable(firstTransaction) + assertTransactionIsRetrievable(secondTransaction) + assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction) + } + + @Test + fun `non-transaction files are ignored`() { + val transactions = newTransaction() + transactionStorage.addTransaction(transactions) + Files.write(storeDir.resolve("random-non-tx-file"), "this is not a transaction!!".toByteArray()) + newTransactionStorage() + assertThat(transactionStorage.transactions).containsExactly(transactions) + } + + private fun newTransactionStorage() { + transactionStorage = PerFileTransactionStorage(storeDir) + } + + private fun assertTransactionIsRetrievable(transaction: SignedTransaction) { + assertThat(transactionStorage.getTransaction(transaction.id)).isEqualTo(transaction) + } + + private var txCount = 0 + private fun newTransaction() = SignedTransaction( + SerializedBytes(Ints.toByteArray(++txCount)), + listOf(DigitalSignature.WithKey(NullPublicKey, ByteArray(1)))) + +} \ No newline at end of file