Add StateMachine -> Recorded TX mapping stream and emit events on transaction records

This commit is contained in:
Andras Slemmer 2016-09-22 15:26:59 +01:00
parent 6c96517f6f
commit 5b10c207e0
28 changed files with 230 additions and 43 deletions

View File

@ -6,6 +6,7 @@ import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.days import com.r3corda.core.days
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.seconds import com.r3corda.core.seconds
import com.r3corda.core.transactions.LedgerTransaction import com.r3corda.core.transactions.LedgerTransaction
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction

View File

@ -1,6 +1,7 @@
package com.r3corda.contracts package com.r3corda.contracts
import com.r3corda.core.contracts.* import com.r3corda.core.contracts.*
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.seconds import com.r3corda.core.seconds
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.DUMMY_NOTARY import com.r3corda.core.utilities.DUMMY_NOTARY

View File

@ -36,14 +36,6 @@ interface ServiceHub {
*/ */
fun recordTransactions(txs: Iterable<SignedTransaction>) fun recordTransactions(txs: Iterable<SignedTransaction>)
/**
* Given some [SignedTransaction]s, writes them to the local storage for validated transactions and then
* sends them to the vault for further processing.
*
* @param txs The transactions to record.
*/
fun recordTransactions(vararg txs: SignedTransaction) = recordTransactions(txs.toList())
/** /**
* Given a [StateRef] loads the referenced transaction and looks up the specified output [ContractState]. * Given a [StateRef] loads the referenced transaction and looks up the specified output [ContractState].
* *
@ -60,4 +52,11 @@ interface ServiceHub {
* @throws IllegalProtocolLogicException or IllegalArgumentException if there are problems with the [logicType] or [args]. * @throws IllegalProtocolLogicException or IllegalArgumentException if there are problems with the [logicType] or [args].
*/ */
fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ListenableFuture<T> fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ListenableFuture<T>
} }
/**
* Given some [SignedTransaction]s, writes them to the local storage for validated transactions and then
* sends them to the vault for further processing.
*
* @param txs The transactions to record.
*/
fun ServiceHub.recordTransactions(vararg txs: SignedTransaction) = recordTransactions(txs.toList())

View File

@ -186,6 +186,8 @@ interface StorageService {
/** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */ /** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */
val attachments: AttachmentStorage val attachments: AttachmentStorage
val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage
/** /**
* Returns the legal identity that this node is configured with. Assumed to be initialised when the node is * Returns the legal identity that this node is configured with. Assumed to be initialised when the node is
* first installed. * first installed.

View File

@ -0,0 +1,16 @@
package com.r3corda.core.node.services
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.protocols.StateMachineRunId
import rx.Observable
data class StateMachineTransactionMapping(val stateMachineRunId: StateMachineRunId, val transactionId: SecureHash)
/**
* This is the interface to storage storing state machine -> recorded tx mappings. Any time a transaction is recorded
* during a protocol run [addMapping] should be called.
*/
interface StateMachineRecordedTransactionMappingStorage {
fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash)
fun track(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>>
}

View File

@ -1,6 +1,7 @@
package com.r3corda.core.protocols package com.r3corda.core.protocols
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
@ -10,7 +11,7 @@ import java.util.*
data class StateMachineRunId private constructor(val uuid: UUID) { data class StateMachineRunId private constructor(val uuid: UUID) {
companion object { companion object {
fun createRandom() = StateMachineRunId(UUID.randomUUID()) fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID())
} }
} }

View File

@ -7,6 +7,7 @@ import com.r3corda.core.contracts.StateRef
import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.signWithECDSA import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
@ -66,7 +67,7 @@ abstract class AbstractStateReplacementProtocol<T> {
} }
val finalTx = stx + signatures val finalTx = stx + signatures
serviceHub.recordTransactions(listOf(finalTx)) serviceHub.recordTransactions(finalTx)
return finalTx.tx.outRef(0) return finalTx.tx.outRef(0)
} }
@ -164,7 +165,7 @@ abstract class AbstractStateReplacementProtocol<T> {
val finalTx = stx + allSignatures val finalTx = stx + allSignatures
finalTx.verifySignatures() finalTx.verifySignatures()
serviceHub.recordTransactions(listOf(finalTx)) serviceHub.recordTransactions(finalTx)
} }
@Suspendable @Suspendable
@ -219,4 +220,4 @@ class StateReplacementRefused(val identity: Party, val state: StateRef, val deta
override fun toString() = "A participant $identity refused to change state $state: " + (detail ?: "no reason provided") override fun toString() = "A participant $identity refused to change state $state: " + (detail ?: "no reason provided")
} }
class StateReplacementException(val error: StateReplacementRefused) : Exception("State change failed - $error") class StateReplacementException(val error: StateReplacementRefused) : Exception("State change failed - $error")

View File

@ -3,6 +3,7 @@ package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.checkedAdd import com.r3corda.core.checkedAdd
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.transactions.LedgerTransaction import com.r3corda.core.transactions.LedgerTransaction
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction

View File

@ -7,6 +7,7 @@ import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.signWithECDSA import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
@ -139,7 +140,7 @@ object TwoPartyDealProtocol {
progressTracker.currentStep = RECORDING progressTracker.currentStep = RECORDING
serviceHub.recordTransactions(listOf(fullySigned)) serviceHub.recordTransactions(fullySigned)
logger.trace { "Deal stored" } logger.trace { "Deal stored" }
@ -219,7 +220,7 @@ object TwoPartyDealProtocol {
logger.trace { "Signatures received are valid. Deal transaction complete! :-)" } logger.trace { "Signatures received are valid. Deal transaction complete! :-)" }
progressTracker.currentStep = RECORDING progressTracker.currentStep = RECORDING
serviceHub.recordTransactions(listOf(fullySigned)) serviceHub.recordTransactions(fullySigned)
logger.trace { "Deal transaction stored" } logger.trace { "Deal transaction stored" }
return fullySigned return fullySigned

View File

@ -5,6 +5,7 @@ import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.crypto.NullSignature import com.r3corda.core.crypto.NullSignature
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.serialization.opaque import com.r3corda.core.serialization.opaque
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
import com.r3corda.testing.node.MockNetwork import com.r3corda.testing.node.MockNetwork

View File

@ -38,10 +38,7 @@ import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NodeRegistration import com.r3corda.node.services.network.NodeRegistration
import com.r3corda.node.services.network.PersistentNetworkMapService import com.r3corda.node.services.network.PersistentNetworkMapService
import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.persistence.*
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
import com.r3corda.node.services.persistence.PerFileTransactionStorage
import com.r3corda.node.services.persistence.StorageServiceImpl
import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
@ -112,8 +109,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
return smm.add(loggerName, logic).resultFuture return smm.add(loggerName, logic).resultFuture
} }
override fun recordTransactions(txs: Iterable<SignedTransaction>) = override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(storage, txs)
recordTransactionsInternal(storage, txs)
} }
val info: NodeInfo by lazy { val info: NodeInfo by lazy {
@ -424,14 +420,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
val transactionStorage = PerFileTransactionStorage(dir.resolve("transactions")) val transactionStorage = PerFileTransactionStorage(dir.resolve("transactions"))
_servicesThatAcceptUploads += attachments _servicesThatAcceptUploads += attachments
val (identity, keyPair) = obtainKeyPair(dir) val (identity, keyPair) = obtainKeyPair(dir)
return Pair(constructStorageService(attachments, transactionStorage, keyPair, identity), checkpointStorage) val stateMachineTransactionMappingStorage = InMemoryStateMachineRecordedTransactionMappingStorage()
return Pair(
constructStorageService(attachments, transactionStorage, stateMachineTransactionMappingStorage, keyPair, identity),
checkpointStorage
)
} }
protected open fun constructStorageService(attachments: NodeAttachmentService, protected open fun constructStorageService(attachments: NodeAttachmentService,
transactionStorage: TransactionStorage, transactionStorage: TransactionStorage,
stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage,
keyPair: KeyPair, keyPair: KeyPair,
identity: Party) = identity: Party) =
StorageServiceImpl(attachments, transactionStorage, keyPair, identity) StorageServiceImpl(attachments, transactionStorage, stateMachineRecordedTransactionMappingStorage, keyPair, identity)
private fun obtainKeyPair(dir: Path): Pair<Party, KeyPair> { private fun obtainKeyPair(dir: Path): Pair<Party, KeyPair> {
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that // Load the private identity key, creating it if necessary. The identity key is a long term well known key that

View File

@ -37,4 +37,5 @@ class ServerRPCOps(
changes.map { StateMachineUpdate.fromStateMachineChange(it) } changes.map { StateMachineUpdate.fromStateMachineChange(it) }
) )
} }
override fun stateMachineRecordedTransactionMapping() = services.storageService.stateMachineRecordedTransactionMapping.track()
} }

View File

@ -80,14 +80,14 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
topic: String, topic: String,
loggerName: String, loggerName: String,
crossinline protocolFactory: (H) -> ProtocolLogic<R>, crossinline protocolFactory: (H) -> ProtocolLogic<R>,
crossinline onResultFuture: (ListenableFuture<R>, H) -> Unit) { crossinline onResultFuture: ProtocolLogic<R>.(ListenableFuture<R>, H) -> Unit) {
net.addMessageHandler(topic, DEFAULT_SESSION_ID, null) { message, reg -> net.addMessageHandler(topic, DEFAULT_SESSION_ID, null) { message, reg ->
try { try {
val handshake = message.data.deserialize<H>() val handshake = message.data.deserialize<H>()
val protocol = protocolFactory(handshake) val protocol = protocolFactory(handshake)
protocol.registerSession(handshake) protocol.registerSession(handshake)
val resultFuture = services.startProtocol(loggerName, protocol) val resultFuture = services.startProtocol(loggerName, protocol)
onResultFuture(resultFuture, handshake) protocol.onResultFuture(resultFuture, handshake)
} catch (e: Exception) { } catch (e: Exception) {
logger.error("Unable to process ${H::class.java.name} message", e) logger.error("Unable to process ${H::class.java.name} message", e)
} }

View File

@ -7,7 +7,8 @@ import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.TxWritableStorageService import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.node.services.statemachine.ProtocolStateMachineImpl
import org.slf4j.LoggerFactory
interface MessagingServiceInternal : MessagingService { interface MessagingServiceInternal : MessagingService {
/** /**
@ -32,6 +33,8 @@ interface MessagingServiceBuilder<out T : MessagingServiceInternal> {
fun start(): ListenableFuture<out T> fun start(): ListenableFuture<out T>
} }
private val log = LoggerFactory.getLogger(ServiceHubInternal::class.java)
abstract class ServiceHubInternal : ServiceHub { abstract class ServiceHubInternal : ServiceHub {
abstract val monitoringService: MonitoringService abstract val monitoringService: MonitoringService
abstract val protocolLogicRefFactory: ProtocolLogicRefFactory abstract val protocolLogicRefFactory: ProtocolLogicRefFactory
@ -46,6 +49,14 @@ abstract class ServiceHubInternal : ServiceHub {
* @param txs The transactions to record. * @param txs The transactions to record.
*/ */
internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable<SignedTransaction>) { internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable<SignedTransaction>) {
val stateMachineRunId = ProtocolStateMachineImpl.retrieveCurrentStateMachine()?.id
if (stateMachineRunId != null) {
txs.forEach {
storageService.stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id)
}
} else {
log.warn("Transaction recorded from outside of a state machine")
}
txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) } txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) }
vaultService.notifyAll(txs.map { it.tx }) vaultService.notifyAll(txs.map { it.tx })
} }

View File

@ -2,6 +2,8 @@ package com.r3corda.node.services.messaging
import com.r3corda.core.contracts.ContractState import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
@ -71,4 +73,6 @@ interface CordaRPCOps : RPCOps {
*/ */
@RPCReturnsObservables @RPCReturnsObservables
fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>>
@RPCReturnsObservables
fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>>
} }

View File

@ -6,6 +6,7 @@ import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.TopicSession import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.node.CordaPluginRegistry import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
import com.r3corda.core.success import com.r3corda.core.success
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
@ -81,7 +82,7 @@ object DataVending {
}, },
{ future, req -> { future, req ->
future.success { future.success {
services.recordTransactions(req.tx) serviceHub.recordTransactions(req.tx)
}.failure { throwable -> }.failure { throwable ->
logger.warn("Received invalid transaction ${req.tx.id} from ${req.replyToParty}", throwable) logger.warn("Received invalid transaction ${req.tx.id} from ${req.replyToParty}", throwable)
} }

View File

@ -0,0 +1,47 @@
package com.r3corda.node.services.persistence
import com.r3corda.core.ThreadBox
import com.r3corda.core.bufferUntilSubscribed
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.protocols.StateMachineRunId
import rx.Observable
import rx.subjects.PublishSubject
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* This is a temporary in-memory storage of a state machine id -> txhash mapping
*
* TODO persist this instead
*/
@ThreadSafe
class InMemoryStateMachineRecordedTransactionMappingStorage : StateMachineRecordedTransactionMappingStorage {
private val mutex = ThreadBox(object {
val stateMachineTransactionMap = HashMap<StateMachineRunId, HashSet<SecureHash>>()
val updates = PublishSubject.create<StateMachineTransactionMapping>()
})
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
mutex.locked {
stateMachineTransactionMap.getOrPut(stateMachineRunId) { HashSet() }.add(transactionId)
updates.onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
}
}
override fun track():
Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> {
mutex.locked {
return Pair(
stateMachineTransactionMap.flatMap { entry ->
entry.value.map {
StateMachineTransactionMapping(entry.key, it)
}
},
updates.bufferUntilSubscribed()
)
}
}
}

View File

@ -2,6 +2,7 @@ package com.r3corda.node.services.persistence
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.node.services.AttachmentStorage import com.r3corda.core.node.services.AttachmentStorage
import com.r3corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import com.r3corda.core.node.services.TransactionStorage import com.r3corda.core.node.services.TransactionStorage
import com.r3corda.core.node.services.TxWritableStorageService import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
@ -9,6 +10,7 @@ import java.security.KeyPair
open class StorageServiceImpl(override val attachments: AttachmentStorage, open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val validatedTransactions: TransactionStorage, override val validatedTransactions: TransactionStorage,
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
override val myLegalIdentityKey: KeyPair, override val myLegalIdentityKey: KeyPair,
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public))
: SingletonSerializeAsToken(), TxWritableStorageService : SingletonSerializeAsToken(), TxWritableStorageService

View File

@ -150,4 +150,12 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
createTransaction() createTransaction()
} }
companion object {
/**
* Retrieves our state machine id if we are running a [ProtocolStateMachineImpl].
*/
fun retrieveCurrentStateMachine(): ProtocolStateMachineImpl<*>? {
return Strand.currentStrand() as? ProtocolStateMachineImpl<*>
}
}
} }

View File

@ -16,6 +16,7 @@ import com.r3corda.node.utilities.AbstractJDBCHashSet
import com.r3corda.node.utilities.JDBCHashedTable import com.r3corda.node.utilities.JDBCHashedTable
import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement import org.jetbrains.exposed.sql.statements.InsertStatement
import org.slf4j.LoggerFactory
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.security.PublicKey import java.security.PublicKey
@ -99,6 +100,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
if (netDelta != Vault.NoUpdate) { if (netDelta != Vault.NoUpdate) {
mutex.locked { mutex.locked {
recordUpdate(netDelta) recordUpdate(netDelta)
_updatesPublisher.onNext(netDelta)
} }
} }
return currentVault return currentVault

View File

@ -9,9 +9,8 @@ import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.days import com.r3corda.core.days
import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.TransactionStorage import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.node.services.Vault
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.WireTransaction import com.r3corda.core.transactions.WireTransaction
import com.r3corda.core.utilities.DUMMY_NOTARY import com.r3corda.core.utilities.DUMMY_NOTARY
@ -121,7 +120,7 @@ class TwoPartyTradeProtocolTests {
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null).second 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null).second
insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey) insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey)
val aliceFuture = runBuyerAndSeller("alice's paper".outputStateAndRef()).second val aliceFuture = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerFuture
// Everything is on this thread so we can now step through the protocol one step at a time. // Everything is on this thread so we can now step through the protocol one step at a time.
// Seller Alice already sent a message to Buyer Bob. Pump once: // Seller Alice already sent a message to Buyer Bob. Pump once:
@ -187,11 +186,14 @@ class TwoPartyTradeProtocolTests {
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, keyPair) { return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, keyPair) {
// That constructs the storage service object in a customised way ... // That constructs the storage service object in a customised way ...
override fun constructStorageService(attachments: NodeAttachmentService, override fun constructStorageService(
transactionStorage: TransactionStorage, attachments: NodeAttachmentService,
keyPair: KeyPair, transactionStorage: TransactionStorage,
identity: Party): StorageServiceImpl { stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage,
return StorageServiceImpl(attachments, RecordingTransactionStorage(transactionStorage), keyPair, identity) keyPair: KeyPair,
identity: Party
): StorageServiceImpl {
return StorageServiceImpl(attachments, RecordingTransactionStorage(transactionStorage), stateMachineRecordedTransactionMappingStorage, keyPair, identity)
} }
} }
} }
@ -288,6 +290,69 @@ class TwoPartyTradeProtocolTests {
} }
} }
@Test
fun `track() works`() {
notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name, BOB_KEY)
ledger(aliceNode.services) {
// Insert a prospectus type attachment into the commercial paper transaction.
val stream = ByteArrayOutputStream()
JarOutputStream(stream).use {
it.putNextEntry(ZipEntry("Prospectus.txt"))
it.write("Our commercial paper is top notch stuff".toByteArray())
it.closeEntry()
}
val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray()))
val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public).second
val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode.services)
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID).second
val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey)
net.runNetwork() // Clear network map registration messages
val aliceTxStream = aliceNode.storage.validatedTransactions.track().second
val aliceTxMappings = aliceNode.storage.stateMachineRecordedTransactionMapping.track().second
val (bobResult, aliceResult, bobSmId, aliceSmId) = runBuyerAndSeller("alice's paper".outputStateAndRef())
net.runNetwork()
// We need to declare this here, if we do it inside [expectEvents] kotlin throws an internal compiler error(!).
val aliceTxExpectations = sequence(
expect { tx: SignedTransaction ->
require(tx.id == bobsFakeCash[0].id)
},
expect { tx: SignedTransaction ->
require(tx.id == bobsFakeCash[2].id)
},
expect { tx: SignedTransaction ->
require(tx.id == bobsFakeCash[1].id)
}
)
aliceTxStream.expectEvents { aliceTxExpectations }
val aliceMappingExpectations = sequence(
expect { mapping: StateMachineTransactionMapping ->
require(mapping.stateMachineRunId == aliceSmId)
require(mapping.transactionId == bobsFakeCash[0].id)
},
expect { mapping: StateMachineTransactionMapping ->
require(mapping.stateMachineRunId == aliceSmId)
require(mapping.transactionId == bobsFakeCash[2].id)
},
expect { mapping: StateMachineTransactionMapping ->
require(mapping.stateMachineRunId == aliceSmId)
require(mapping.transactionId == bobsFakeCash[1].id)
}
)
aliceTxMappings.expectEvents { aliceMappingExpectations }
}
}
@Test @Test
fun `dependency with error on buyer side`() { fun `dependency with error on buyer side`() {
ledger { ledger {
@ -302,15 +367,21 @@ class TwoPartyTradeProtocolTests {
} }
} }
data class RunResult(
val buyerFuture: Future<SignedTransaction>,
val sellerFuture: Future<SignedTransaction>,
val buyerSmId: StateMachineRunId,
val sellerSmId: StateMachineRunId
)
private fun runBuyerAndSeller(assetToSell: StateAndRef<OwnableState>) : Pair<Future<SignedTransaction>, Future<SignedTransaction>> { private fun runBuyerAndSeller(assetToSell: StateAndRef<OwnableState>): RunResult {
val buyer = Buyer(aliceNode.info.identity, notaryNode.info.identity, 1000.DOLLARS, CommercialPaper.State::class.java) val buyer = Buyer(aliceNode.info.identity, notaryNode.info.identity, 1000.DOLLARS, CommercialPaper.State::class.java)
val seller = Seller(bobNode.info.identity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY) val seller = Seller(bobNode.info.identity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY)
connectProtocols(buyer, seller) connectProtocols(buyer, seller)
// We start the Buyer first, as the Seller sends the first message // We start the Buyer first, as the Seller sends the first message
val buyerResult = bobNode.smm.add("$TOPIC.buyer", buyer).resultFuture val buyerPsm = bobNode.smm.add("$TOPIC.buyer", buyer)
val sellerResult = aliceNode.smm.add("$TOPIC.seller", seller).resultFuture val sellerPsm = aliceNode.smm.add("$TOPIC.seller", seller)
return Pair(buyerResult, sellerResult) return RunResult(buyerPsm.resultFuture, sellerPsm.resultFuture, buyerPsm.id, sellerPsm.id)
} }
private fun LedgerDSL<TestTransactionDSLInterpreter, TestLedgerDSLInterpreter>.runWithError( private fun LedgerDSL<TestTransactionDSLInterpreter, TestLedgerDSLInterpreter>.runWithError(

View File

@ -6,6 +6,7 @@ import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.node.services.* import com.r3corda.core.node.services.*
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.node.serialization.NodeClock import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.api.MonitoringService import com.r3corda.node.services.api.MonitoringService
@ -56,8 +57,7 @@ open class MockServiceHubInternal(
private val txStorageService: TxWritableStorageService private val txStorageService: TxWritableStorageService
get() = storage ?: throw UnsupportedOperationException() get() = storage ?: throw UnsupportedOperationException()
override fun recordTransactions(txs: Iterable<SignedTransaction>) = override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
recordTransactionsInternal(txStorageService, txs)
lateinit var smm: StateMachineManager lateinit var smm: StateMachineManager

View File

@ -6,6 +6,7 @@ import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.days import com.r3corda.core.days
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRef import com.r3corda.core.protocols.ProtocolLogicRef
import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.protocols.ProtocolLogicRefFactory

View File

@ -5,6 +5,7 @@ import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
import com.r3corda.contracts.asset.cashBalances import com.r3corda.contracts.asset.cashBalances
import com.r3corda.contracts.testing.fillWithSomeTestCash import com.r3corda.contracts.testing.fillWithSomeTestCash
import com.r3corda.core.contracts.* import com.r3corda.core.contracts.*
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.node.services.VaultService import com.r3corda.core.node.services.VaultService
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.DUMMY_NOTARY import com.r3corda.core.utilities.DUMMY_NOTARY

View File

@ -9,6 +9,7 @@ import com.r3corda.core.contracts.DOLLARS
import com.r3corda.core.contracts.OwnableState import com.r3corda.core.contracts.OwnableState
import com.r3corda.core.contracts.`issued by` import com.r3corda.core.contracts.`issued by`
import com.r3corda.core.days import com.r3corda.core.days
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.seconds import com.r3corda.core.seconds
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.protocols.TwoPartyTradeProtocol import com.r3corda.protocols.TwoPartyTradeProtocol

View File

@ -3,6 +3,7 @@ package com.r3corda.testing
import com.r3corda.core.contracts.* import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.* import com.r3corda.core.crypto.*
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.recordTransactions
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.TransactionBuilder import com.r3corda.core.transactions.TransactionBuilder

View File

@ -10,9 +10,11 @@ import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.* import com.r3corda.core.node.services.*
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.DUMMY_NOTARY import com.r3corda.core.utilities.DUMMY_NOTARY
import com.r3corda.node.services.persistence.InMemoryStateMachineRecordedTransactionMappingStorage
import com.r3corda.testing.MEGA_CORP import com.r3corda.testing.MEGA_CORP
import com.r3corda.testing.MINI_CORP import com.r3corda.testing.MINI_CORP
import rx.Observable import rx.Observable
@ -42,6 +44,9 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub {
} }
override fun recordTransactions(txs: Iterable<SignedTransaction>) { override fun recordTransactions(txs: Iterable<SignedTransaction>) {
txs.forEach {
storageService.stateMachineRecordedTransactionMapping.addMapping(StateMachineRunId.createRandom(), it.id)
}
for (stx in txs) { for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx) storageService.validatedTransactions.addTransaction(stx)
} }
@ -116,6 +121,10 @@ class MockAttachmentStorage : AttachmentStorage {
} }
} }
class MockStateMachineRecordedTransactionMappingStorage(
val storage: StateMachineRecordedTransactionMappingStorage = InMemoryStateMachineRecordedTransactionMappingStorage()
) : StateMachineRecordedTransactionMappingStorage by storage
open class MockTransactionStorage : TransactionStorage { open class MockTransactionStorage : TransactionStorage {
override fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> { override fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> {
return Pair(txns.values.toList(), _updatesPublisher) return Pair(txns.values.toList(), _updatesPublisher)
@ -142,7 +151,8 @@ open class MockTransactionStorage : TransactionStorage {
class MockStorageService(override val attachments: AttachmentStorage = MockAttachmentStorage(), class MockStorageService(override val attachments: AttachmentStorage = MockAttachmentStorage(),
override val validatedTransactions: TransactionStorage = MockTransactionStorage(), override val validatedTransactions: TransactionStorage = MockTransactionStorage(),
override val myLegalIdentityKey: KeyPair = generateKeyPair(), override val myLegalIdentityKey: KeyPair = generateKeyPair(),
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public),
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage = MockStateMachineRecordedTransactionMappingStorage())
: SingletonSerializeAsToken(), TxWritableStorageService : SingletonSerializeAsToken(), TxWritableStorageService
/** /**