Merged in rnicoll-force-record-tx (pull request #152)

Require all transactions are added via the ServiceHub.recordTransaction() function
This commit is contained in:
Ross Nicoll 2016-06-20 14:25:20 +01:00
commit 8f57213270
11 changed files with 75 additions and 30 deletions

View File

@ -1,7 +1,7 @@
package com.r3corda.core.contracts package com.r3corda.core.contracts
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.TransactionStorage import com.r3corda.core.node.services.ReadOnlyTransactionStorage
import java.util.* import java.util.*
import java.util.concurrent.Callable import java.util.concurrent.Callable
@ -16,7 +16,7 @@ import java.util.concurrent.Callable
* @param transactions map of transaction id to [SignedTransaction] * @param transactions map of transaction id to [SignedTransaction]
* @param startPoints transactions to use as starting points for the search * @param startPoints transactions to use as starting points for the search
*/ */
class TransactionGraphSearch(val transactions: TransactionStorage, class TransactionGraphSearch(val transactions: ReadOnlyTransactionStorage,
val startPoints: List<WireTransaction>) : Callable<List<WireTransaction>> { val startPoints: List<WireTransaction>) : Callable<List<WireTransaction>> {
class Query( class Query(
val withCommandOfType: Class<out CommandData>? = null val withCommandOfType: Class<out CommandData>? = null

View File

@ -39,14 +39,18 @@ interface ServiceHub {
* Given a list of [SignedTransaction]s, writes them to the local storage for validated transactions and then * Given a list of [SignedTransaction]s, writes them to the local storage for validated transactions and then
* sends them to the wallet for further processing. * sends them to the wallet for further processing.
* *
* TODO: Need to come up with a way for preventing transactions being written other than by this method. * @param txs The transactions to record
* @see recordTransactionsInternal function to call with the writable storage service
*/
fun recordTransactions(txs: Iterable<SignedTransaction>)
/**
* Given some [SignedTransaction]s, writes them to the local storage for validated transactions and then
* sends them to the wallet for further processing.
* *
* @param txs The transactions to record * @param txs The transactions to record
*/ */
fun recordTransactions(txs: List<SignedTransaction>) { fun recordTransactions(vararg txs: SignedTransaction) = recordTransactions(txs.toList())
txs.forEach { storageService.validatedTransactions.addTransaction(it) }
walletService.notifyAll(txs.map { it.tx })
}
/** /**
* Given a [StateRef] loads the referenced transaction and looks up the specified output [ContractState] * Given a [StateRef] loads the referenced transaction and looks up the specified output [ContractState]

View File

@ -5,8 +5,7 @@ import com.r3corda.core.crypto.SecureHash
import java.io.InputStream import java.io.InputStream
/** /**
* An attachment store records potentially large binary objects, identified by their hash. Note that attachments are * An attachment store records potentially large binary objects, identified by their hash.
* immutable and can never be erased once inserted!
*/ */
interface AttachmentStorage { interface AttachmentStorage {
/** /**

View File

@ -159,7 +159,7 @@ interface StorageService {
* The signatures aren't technically needed after that point, but we keep them around so that we can relay * The signatures aren't technically needed after that point, but we keep them around so that we can relay
* the transaction data to other nodes that need it. * the transaction data to other nodes that need it.
*/ */
val validatedTransactions: TransactionStorage val validatedTransactions: ReadOnlyTransactionStorage
/** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */ /** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */
val attachments: AttachmentStorage val attachments: AttachmentStorage
@ -172,4 +172,16 @@ interface StorageService {
val myLegalIdentityKey: KeyPair val myLegalIdentityKey: KeyPair
} }
/**
* Storage service, with extensions to allow validated transactions to be added to. For use only within [ServiceHub].
*/
interface TxWritableStorageService : StorageService {
/**
* A map of hash->tx where tx has been signature/contract validated and the states are known to be correct.
* The signatures aren't technically needed after that point, but we keep them around so that we can relay
* the transaction data to other nodes that need it.
*/
override val validatedTransactions: TransactionStorage
}

View File

@ -6,17 +6,21 @@ import com.r3corda.core.crypto.SecureHash
/** /**
* Thread-safe storage of transactions. * Thread-safe storage of transactions.
*/ */
interface TransactionStorage { interface ReadOnlyTransactionStorage {
/**
* Add a new transaction to the store. If the store already has a transaction with the same id it will be
* overwritten.
*/
fun addTransaction(transaction: SignedTransaction)
/** /**
* Return the transaction with the given [id], or null if no such transaction exists. * Return the transaction with the given [id], or null if no such transaction exists.
*/ */
fun getTransaction(id: SecureHash): SignedTransaction? fun getTransaction(id: SecureHash): SignedTransaction?
}
/**
* Thread-safe storage of transactions.
*/
interface TransactionStorage : ReadOnlyTransactionStorage {
/**
* Add a new transaction to the store. If the store already has a transaction with the same id it will be
* overwritten.
*/
// TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry.
fun addTransaction(transaction: SignedTransaction)
} }

View File

@ -90,4 +90,4 @@ class MockStorageService(override val attachments: AttachmentStorage = MockAttac
override val validatedTransactions: TransactionStorage = MockTransactionStorage(), override val validatedTransactions: TransactionStorage = MockTransactionStorage(),
override val myLegalIdentityKey: KeyPair = generateKeyPair(), override val myLegalIdentityKey: KeyPair = generateKeyPair(),
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public))
: SingletonSerializeAsToken(), StorageService : SingletonSerializeAsToken(), TxWritableStorageService

View File

@ -4,6 +4,7 @@ import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.RunOnCallerThread import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.runOnNextMessage
@ -81,15 +82,18 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
protected val _servicesThatAcceptUploads = ArrayList<AcceptsFileUpload>() protected val _servicesThatAcceptUploads = ArrayList<AcceptsFileUpload>()
val servicesThatAcceptUploads: List<AcceptsFileUpload> = _servicesThatAcceptUploads val servicesThatAcceptUploads: List<AcceptsFileUpload> = _servicesThatAcceptUploads
val services = object : ServiceHubInternal { val services = object : ServiceHubInternal() {
override val networkService: MessagingService get() = net override val networkService: MessagingService get() = net
override val networkMapCache: NetworkMapCache = InMemoryNetworkMapCache() override val networkMapCache: NetworkMapCache = InMemoryNetworkMapCache()
override val storageService: StorageService get() = storage override val storageService: TxWritableStorageService get() = storage
override val walletService: WalletService get() = wallet override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement override val keyManagementService: KeyManagementService get() = keyManagement
override val identityService: IdentityService get() = identity override val identityService: IdentityService get() = identity
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val clock: Clock = platformClock override val clock: Clock = platformClock
override fun recordTransactions(txs: Iterable<SignedTransaction>) =
recordTransactionsInternal(storage, txs)
} }
val info: NodeInfo by lazy { val info: NodeInfo by lazy {
@ -98,7 +102,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
open fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity] open fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity]
lateinit var storage: StorageService lateinit var storage: TxWritableStorageService
lateinit var checkpointStorage: CheckpointStorage lateinit var checkpointStorage: CheckpointStorage
lateinit var smm: StateMachineManager lateinit var smm: StateMachineManager
lateinit var wallet: WalletService lateinit var wallet: WalletService
@ -266,7 +270,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
protected abstract fun startMessagingService() protected abstract fun startMessagingService()
protected open fun initialiseStorageService(dir: Path): Pair<StorageService, CheckpointStorage> { protected open fun initialiseStorageService(dir: Path): Pair<TxWritableStorageService, CheckpointStorage> {
val attachments = makeAttachmentStorage(dir) val attachments = makeAttachmentStorage(dir)
val checkpointStorage = PerFileCheckpointStorage(dir.resolve("checkpoints")) val checkpointStorage = PerFileCheckpointStorage(dir.resolve("checkpoints"))
val transactionStorage = PerFileTransactionStorage(dir.resolve("transactions")) val transactionStorage = PerFileTransactionStorage(dir.resolve("transactions"))

View File

@ -38,7 +38,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
tx.signWith(seller.storage.myLegalIdentityKey) tx.signWith(seller.storage.myLegalIdentityKey)
tx.toSignedTransaction(true) tx.toSignedTransaction(true)
} }
seller.services.storageService.validatedTransactions.addTransaction(issuance) seller.services.recordTransactions(issuance)
val cashIssuerKey = generateKeyPair() val cashIssuerKey = generateKeyPair()
val amount = 1000.DOLLARS `issued by` Party("Big friendly bank", cashIssuerKey.public).ref(1) val amount = 1000.DOLLARS `issued by` Party("Big friendly bank", cashIssuerKey.public).ref(1)

View File

@ -1,7 +1,21 @@
package com.r3corda.node.services.api package com.r3corda.node.services.api
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.TxWritableStorageService
interface ServiceHubInternal : ServiceHub { abstract class ServiceHubInternal : ServiceHub {
val monitoringService: MonitoringService abstract val monitoringService: MonitoringService
/**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
* sends them to the wallet for further processing. This is intended for implementations to call from
* [recordTransactions].
*
* @param txs The transactions to record
*/
internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable<SignedTransaction>) {
txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) }
walletService.notifyAll(txs.map { it.tx })
}
} }

View File

@ -4,6 +4,7 @@ import com.r3corda.core.crypto.Party
import com.r3corda.core.node.services.AttachmentStorage import com.r3corda.core.node.services.AttachmentStorage
import com.r3corda.core.node.services.StorageService import com.r3corda.core.node.services.StorageService
import com.r3corda.core.node.services.TransactionStorage import com.r3corda.core.node.services.TransactionStorage
import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import java.security.KeyPair import java.security.KeyPair
@ -11,4 +12,4 @@ open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val validatedTransactions: TransactionStorage, override val validatedTransactions: TransactionStorage,
override val myLegalIdentityKey: KeyPair, override val myLegalIdentityKey: KeyPair,
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public))
: SingletonSerializeAsToken(), StorageService : SingletonSerializeAsToken(), TxWritableStorageService

View File

@ -1,6 +1,7 @@
package com.r3corda.node.services package com.r3corda.node.services
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.* import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.testing.MockStorageService import com.r3corda.core.node.services.testing.MockStorageService
@ -19,11 +20,11 @@ open class MockServices(
val keyManagement: KeyManagementService? = null, val keyManagement: KeyManagementService? = null,
val net: MessagingService? = null, val net: MessagingService? = null,
val identity: IdentityService? = MOCK_IDENTITY_SERVICE, val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
val storage: StorageService? = MockStorageService(), val storage: TxWritableStorageService? = MockStorageService(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(), val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val mapService: NetworkMapService? = null, val mapService: NetworkMapService? = null,
val overrideClock: Clock? = NodeClock() val overrideClock: Clock? = NodeClock()
) : ServiceHubInternal { ) : ServiceHubInternal() {
override val walletService: WalletService = customWallet ?: NodeWalletService(this) override val walletService: WalletService = customWallet ?: NodeWalletService(this)
override val keyManagementService: KeyManagementService override val keyManagementService: KeyManagementService
@ -40,6 +41,12 @@ open class MockServices(
get() = overrideClock ?: throw UnsupportedOperationException() get() = overrideClock ?: throw UnsupportedOperationException()
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
// We isolate the storage service with writable TXes so that it can't be accessed except via recordTransactions()
private val txStorageService: TxWritableStorageService
get() = storage ?: throw UnsupportedOperationException()
override fun recordTransactions(txs: Iterable<SignedTransaction>) =
recordTransactionsInternal(txStorageService, txs)
init { init {
if (net != null && storage != null) { if (net != null && storage != null) {