From 5b10c207e09d82e358785a38c77cd350af652c99 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 22 Sep 2016 15:26:59 +0100 Subject: [PATCH] Add StateMachine -> Recorded TX mapping stream and emit events on transaction records --- .../r3corda/contracts/CommercialPaperTests.kt | 1 + .../kotlin/com/r3corda/contracts/IRSTests.kt | 1 + .../com/r3corda/core/node/ServiceHub.kt | 17 ++-- .../r3corda/core/node/services/Services.kt | 2 + ...achineRecordedTransactionMappingStorage.kt | 16 +++ .../core/protocols/ProtocolStateMachine.kt | 3 +- .../AbstractStateReplacementProtocol.kt | 7 +- .../protocols/BroadcastTransactionProtocol.kt | 1 + .../protocols/ResolveTransactionsProtocol.kt | 1 + .../r3corda/protocols/TwoPartyDealProtocol.kt | 5 +- .../ResolveTransactionsProtocolTest.kt | 1 + .../com/r3corda/node/internal/AbstractNode.kt | 17 ++-- .../com/r3corda/node/internal/ServerRPCOps.kt | 1 + .../node/services/api/AbstractNodeService.kt | 4 +- .../node/services/api/ServiceHubInternal.kt | 13 ++- .../node/services/messaging/CordaRPCOps.kt | 4 + .../persistence/DataVendingService.kt | 3 +- ...achineRecordedTransactionMappingStorage.kt | 47 +++++++++ .../persistence/StorageServiceImpl.kt | 2 + .../statemachine/ProtocolStateMachineImpl.kt | 8 ++ .../node/services/vault/NodeVaultService.kt | 2 + .../messaging/TwoPartyTradeProtocolTests.kt | 97 ++++++++++++++++--- .../node/services/MockServiceHubInternal.kt | 4 +- .../node/services/NodeSchedulerServiceTest.kt | 1 + .../node/services/VaultWithCashTest.kt | 1 + .../com/r3corda/simulation/TradeSimulation.kt | 1 + .../kotlin/com/r3corda/testing/TestDSL.kt | 1 + .../com/r3corda/testing/node/MockServices.kt | 12 ++- 28 files changed, 230 insertions(+), 43 deletions(-) create mode 100644 core/src/main/kotlin/com/r3corda/core/node/services/StateMachineRecordedTransactionMappingStorage.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/services/persistence/InMemoryStateMachineRecordedTransactionMappingStorage.kt diff --git a/contracts/src/test/kotlin/com/r3corda/contracts/CommercialPaperTests.kt b/contracts/src/test/kotlin/com/r3corda/contracts/CommercialPaperTests.kt index 70248aef4f..7c1340c9de 100644 --- a/contracts/src/test/kotlin/com/r3corda/contracts/CommercialPaperTests.kt +++ b/contracts/src/test/kotlin/com/r3corda/contracts/CommercialPaperTests.kt @@ -6,6 +6,7 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash import com.r3corda.core.days +import com.r3corda.core.node.recordTransactions import com.r3corda.core.seconds import com.r3corda.core.transactions.LedgerTransaction import com.r3corda.core.transactions.SignedTransaction diff --git a/contracts/src/test/kotlin/com/r3corda/contracts/IRSTests.kt b/contracts/src/test/kotlin/com/r3corda/contracts/IRSTests.kt index 43d2f7f069..8365e13406 100644 --- a/contracts/src/test/kotlin/com/r3corda/contracts/IRSTests.kt +++ b/contracts/src/test/kotlin/com/r3corda/contracts/IRSTests.kt @@ -1,6 +1,7 @@ package com.r3corda.contracts import com.r3corda.core.contracts.* +import com.r3corda.core.node.recordTransactions import com.r3corda.core.seconds import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.utilities.DUMMY_NOTARY diff --git a/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt b/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt index 1545514640..78b4b13f3b 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt @@ -36,14 +36,6 @@ interface ServiceHub { */ fun recordTransactions(txs: Iterable) - /** - * Given some [SignedTransaction]s, writes them to the local storage for validated transactions and then - * sends them to the vault for further processing. - * - * @param txs The transactions to record. - */ - fun recordTransactions(vararg txs: SignedTransaction) = recordTransactions(txs.toList()) - /** * Given a [StateRef] loads the referenced transaction and looks up the specified output [ContractState]. * @@ -60,4 +52,11 @@ interface ServiceHub { * @throws IllegalProtocolLogicException or IllegalArgumentException if there are problems with the [logicType] or [args]. */ fun invokeProtocolAsync(logicType: Class>, vararg args: Any?): ListenableFuture -} \ No newline at end of file +} +/** + * Given some [SignedTransaction]s, writes them to the local storage for validated transactions and then + * sends them to the vault for further processing. + * + * @param txs The transactions to record. + */ +fun ServiceHub.recordTransactions(vararg txs: SignedTransaction) = recordTransactions(txs.toList()) diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt index 60a8fc1e7e..7560e19d8a 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt @@ -186,6 +186,8 @@ interface StorageService { /** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */ val attachments: AttachmentStorage + val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage + /** * Returns the legal identity that this node is configured with. Assumed to be initialised when the node is * first installed. diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/StateMachineRecordedTransactionMappingStorage.kt b/core/src/main/kotlin/com/r3corda/core/node/services/StateMachineRecordedTransactionMappingStorage.kt new file mode 100644 index 0000000000..26ce14195d --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/core/node/services/StateMachineRecordedTransactionMappingStorage.kt @@ -0,0 +1,16 @@ +package com.r3corda.core.node.services + +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.protocols.StateMachineRunId +import rx.Observable + +data class StateMachineTransactionMapping(val stateMachineRunId: StateMachineRunId, val transactionId: SecureHash) + +/** + * This is the interface to storage storing state machine -> recorded tx mappings. Any time a transaction is recorded + * during a protocol run [addMapping] should be called. + */ +interface StateMachineRecordedTransactionMappingStorage { + fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) + fun track(): Pair, Observable> +} diff --git a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt index 94987d5fc6..da0abc3c22 100644 --- a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt +++ b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt @@ -1,6 +1,7 @@ package com.r3corda.core.protocols import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.Strand import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.crypto.Party import com.r3corda.core.node.ServiceHub @@ -10,7 +11,7 @@ import java.util.* data class StateMachineRunId private constructor(val uuid: UUID) { companion object { - fun createRandom() = StateMachineRunId(UUID.randomUUID()) + fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID()) } } diff --git a/core/src/main/kotlin/com/r3corda/protocols/AbstractStateReplacementProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/AbstractStateReplacementProtocol.kt index 8a86e0747b..50664ca172 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/AbstractStateReplacementProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/AbstractStateReplacementProtocol.kt @@ -7,6 +7,7 @@ import com.r3corda.core.contracts.StateRef import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.signWithECDSA +import com.r3corda.core.node.recordTransactions import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.transactions.SignedTransaction @@ -66,7 +67,7 @@ abstract class AbstractStateReplacementProtocol { } val finalTx = stx + signatures - serviceHub.recordTransactions(listOf(finalTx)) + serviceHub.recordTransactions(finalTx) return finalTx.tx.outRef(0) } @@ -164,7 +165,7 @@ abstract class AbstractStateReplacementProtocol { val finalTx = stx + allSignatures finalTx.verifySignatures() - serviceHub.recordTransactions(listOf(finalTx)) + serviceHub.recordTransactions(finalTx) } @Suspendable @@ -219,4 +220,4 @@ class StateReplacementRefused(val identity: Party, val state: StateRef, val deta override fun toString() = "A participant $identity refused to change state $state: " + (detail ?: "no reason provided") } -class StateReplacementException(val error: StateReplacementRefused) : Exception("State change failed - $error") \ No newline at end of file +class StateReplacementException(val error: StateReplacementRefused) : Exception("State change failed - $error") diff --git a/core/src/main/kotlin/com/r3corda/protocols/BroadcastTransactionProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/BroadcastTransactionProtocol.kt index b146b00f9e..0d1e868c9c 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/BroadcastTransactionProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/BroadcastTransactionProtocol.kt @@ -3,6 +3,7 @@ package com.r3corda.protocols import co.paralleluniverse.fibers.Suspendable import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.crypto.Party +import com.r3corda.core.node.recordTransactions import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.transactions.SignedTransaction diff --git a/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt index bc1b7a25c8..749fe4a372 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import com.r3corda.core.checkedAdd import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.recordTransactions import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.transactions.LedgerTransaction import com.r3corda.core.transactions.SignedTransaction diff --git a/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt index 448d937d3d..cee3398213 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt @@ -7,6 +7,7 @@ import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.signWithECDSA import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.recordTransactions import com.r3corda.core.node.services.ServiceType import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue @@ -139,7 +140,7 @@ object TwoPartyDealProtocol { progressTracker.currentStep = RECORDING - serviceHub.recordTransactions(listOf(fullySigned)) + serviceHub.recordTransactions(fullySigned) logger.trace { "Deal stored" } @@ -219,7 +220,7 @@ object TwoPartyDealProtocol { logger.trace { "Signatures received are valid. Deal transaction complete! :-)" } progressTracker.currentStep = RECORDING - serviceHub.recordTransactions(listOf(fullySigned)) + serviceHub.recordTransactions(fullySigned) logger.trace { "Deal transaction stored" } return fullySigned diff --git a/core/src/test/kotlin/com/r3corda/core/protocols/ResolveTransactionsProtocolTest.kt b/core/src/test/kotlin/com/r3corda/core/protocols/ResolveTransactionsProtocolTest.kt index 43ae3d36e5..3f28acbbc4 100644 --- a/core/src/test/kotlin/com/r3corda/core/protocols/ResolveTransactionsProtocolTest.kt +++ b/core/src/test/kotlin/com/r3corda/core/protocols/ResolveTransactionsProtocolTest.kt @@ -5,6 +5,7 @@ import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.crypto.NullSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.recordTransactions import com.r3corda.core.serialization.opaque import com.r3corda.core.utilities.DUMMY_NOTARY_KEY import com.r3corda.testing.node.MockNetwork diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 337abb98c1..5fc67a15a0 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -38,10 +38,7 @@ import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC import com.r3corda.node.services.network.NodeRegistration import com.r3corda.node.services.network.PersistentNetworkMapService -import com.r3corda.node.services.persistence.NodeAttachmentService -import com.r3corda.node.services.persistence.PerFileCheckpointStorage -import com.r3corda.node.services.persistence.PerFileTransactionStorage -import com.r3corda.node.services.persistence.StorageServiceImpl +import com.r3corda.node.services.persistence.* import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.SimpleNotaryService @@ -112,8 +109,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap return smm.add(loggerName, logic).resultFuture } - override fun recordTransactions(txs: Iterable) = - recordTransactionsInternal(storage, txs) + override fun recordTransactions(txs: Iterable) = recordTransactionsInternal(storage, txs) } val info: NodeInfo by lazy { @@ -424,14 +420,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap val transactionStorage = PerFileTransactionStorage(dir.resolve("transactions")) _servicesThatAcceptUploads += attachments val (identity, keyPair) = obtainKeyPair(dir) - return Pair(constructStorageService(attachments, transactionStorage, keyPair, identity), checkpointStorage) + val stateMachineTransactionMappingStorage = InMemoryStateMachineRecordedTransactionMappingStorage() + return Pair( + constructStorageService(attachments, transactionStorage, stateMachineTransactionMappingStorage, keyPair, identity), + checkpointStorage + ) } protected open fun constructStorageService(attachments: NodeAttachmentService, transactionStorage: TransactionStorage, + stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage, keyPair: KeyPair, identity: Party) = - StorageServiceImpl(attachments, transactionStorage, keyPair, identity) + StorageServiceImpl(attachments, transactionStorage, stateMachineRecordedTransactionMappingStorage, keyPair, identity) private fun obtainKeyPair(dir: Path): Pair { // Load the private identity key, creating it if necessary. The identity key is a long term well known key that diff --git a/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt b/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt index a4bfcb378d..4aec40a792 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt @@ -37,4 +37,5 @@ class ServerRPCOps( changes.map { StateMachineUpdate.fromStateMachineChange(it) } ) } + override fun stateMachineRecordedTransactionMapping() = services.storageService.stateMachineRecordedTransactionMapping.track() } diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt index 226251db7a..badc92a009 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt @@ -80,14 +80,14 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton topic: String, loggerName: String, crossinline protocolFactory: (H) -> ProtocolLogic, - crossinline onResultFuture: (ListenableFuture, H) -> Unit) { + crossinline onResultFuture: ProtocolLogic.(ListenableFuture, H) -> Unit) { net.addMessageHandler(topic, DEFAULT_SESSION_ID, null) { message, reg -> try { val handshake = message.data.deserialize() val protocol = protocolFactory(handshake) protocol.registerSession(handshake) val resultFuture = services.startProtocol(loggerName, protocol) - onResultFuture(resultFuture, handshake) + protocol.onResultFuture(resultFuture, handshake) } catch (e: Exception) { logger.error("Unable to process ${H::class.java.name} message", e) } diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt index 074fbf8a37..05758f4908 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt @@ -7,7 +7,8 @@ import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.TxWritableStorageService import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogicRefFactory -import com.r3corda.core.protocols.StateMachineRunId +import com.r3corda.node.services.statemachine.ProtocolStateMachineImpl +import org.slf4j.LoggerFactory interface MessagingServiceInternal : MessagingService { /** @@ -32,6 +33,8 @@ interface MessagingServiceBuilder { fun start(): ListenableFuture } +private val log = LoggerFactory.getLogger(ServiceHubInternal::class.java) + abstract class ServiceHubInternal : ServiceHub { abstract val monitoringService: MonitoringService abstract val protocolLogicRefFactory: ProtocolLogicRefFactory @@ -46,6 +49,14 @@ abstract class ServiceHubInternal : ServiceHub { * @param txs The transactions to record. */ internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable) { + val stateMachineRunId = ProtocolStateMachineImpl.retrieveCurrentStateMachine()?.id + if (stateMachineRunId != null) { + txs.forEach { + storageService.stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id) + } + } else { + log.warn("Transaction recorded from outside of a state machine") + } txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) } vaultService.notifyAll(txs.map { it.tx }) } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt index 30f9e255ec..095a039596 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt @@ -2,6 +2,8 @@ package com.r3corda.node.services.messaging import com.r3corda.core.contracts.ContractState import com.r3corda.core.contracts.StateAndRef +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.services.StateMachineTransactionMapping import com.r3corda.core.node.services.Vault import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.transactions.SignedTransaction @@ -71,4 +73,6 @@ interface CordaRPCOps : RPCOps { */ @RPCReturnsObservables fun verifiedTransactions(): Pair, Observable> + @RPCReturnsObservables + fun stateMachineRecordedTransactionMapping(): Pair, Observable> } diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt index 4c3c7dfedf..f5b925a4ef 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt @@ -6,6 +6,7 @@ import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.TopicSession import com.r3corda.core.node.CordaPluginRegistry import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.recordTransactions import com.r3corda.core.serialization.serialize import com.r3corda.core.success import com.r3corda.core.transactions.SignedTransaction @@ -81,7 +82,7 @@ object DataVending { }, { future, req -> future.success { - services.recordTransactions(req.tx) + serviceHub.recordTransactions(req.tx) }.failure { throwable -> logger.warn("Received invalid transaction ${req.tx.id} from ${req.replyToParty}", throwable) } diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/InMemoryStateMachineRecordedTransactionMappingStorage.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/InMemoryStateMachineRecordedTransactionMappingStorage.kt new file mode 100644 index 0000000000..726af16ea8 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/InMemoryStateMachineRecordedTransactionMappingStorage.kt @@ -0,0 +1,47 @@ +package com.r3corda.node.services.persistence + +import com.r3corda.core.ThreadBox +import com.r3corda.core.bufferUntilSubscribed +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.services.StateMachineRecordedTransactionMappingStorage +import com.r3corda.core.node.services.StateMachineTransactionMapping +import com.r3corda.core.protocols.StateMachineRunId +import rx.Observable +import rx.subjects.PublishSubject +import java.util.* +import javax.annotation.concurrent.ThreadSafe + +/** + * This is a temporary in-memory storage of a state machine id -> txhash mapping + * + * TODO persist this instead + */ +@ThreadSafe +class InMemoryStateMachineRecordedTransactionMappingStorage : StateMachineRecordedTransactionMappingStorage { + + private val mutex = ThreadBox(object { + val stateMachineTransactionMap = HashMap>() + val updates = PublishSubject.create() + }) + + override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) { + mutex.locked { + stateMachineTransactionMap.getOrPut(stateMachineRunId) { HashSet() }.add(transactionId) + updates.onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId)) + } + } + + override fun track(): + Pair, Observable> { + mutex.locked { + return Pair( + stateMachineTransactionMap.flatMap { entry -> + entry.value.map { + StateMachineTransactionMapping(entry.key, it) + } + }, + updates.bufferUntilSubscribed() + ) + } + } +} diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt index f2f3ba918c..6a6705c06b 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt @@ -2,6 +2,7 @@ package com.r3corda.node.services.persistence import com.r3corda.core.crypto.Party import com.r3corda.core.node.services.AttachmentStorage +import com.r3corda.core.node.services.StateMachineRecordedTransactionMappingStorage import com.r3corda.core.node.services.TransactionStorage import com.r3corda.core.node.services.TxWritableStorageService import com.r3corda.core.serialization.SingletonSerializeAsToken @@ -9,6 +10,7 @@ import java.security.KeyPair open class StorageServiceImpl(override val attachments: AttachmentStorage, override val validatedTransactions: TransactionStorage, + override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage, override val myLegalIdentityKey: KeyPair, override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) : SingletonSerializeAsToken(), TxWritableStorageService diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt index a4e8c8fccf..1e6248848b 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt @@ -150,4 +150,12 @@ class ProtocolStateMachineImpl(override val id: StateMachineRunId, createTransaction() } + companion object { + /** + * Retrieves our state machine id if we are running a [ProtocolStateMachineImpl]. + */ + fun retrieveCurrentStateMachine(): ProtocolStateMachineImpl<*>? { + return Strand.currentStrand() as? ProtocolStateMachineImpl<*> + } + } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt index 085daf1f93..72712c2425 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt @@ -16,6 +16,7 @@ import com.r3corda.node.utilities.AbstractJDBCHashSet import com.r3corda.node.utilities.JDBCHashedTable import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement +import org.slf4j.LoggerFactory import rx.Observable import rx.subjects.PublishSubject import java.security.PublicKey @@ -99,6 +100,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT if (netDelta != Vault.NoUpdate) { mutex.locked { recordUpdate(netDelta) + _updatesPublisher.onNext(netDelta) } } return currentVault diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt index ee046fed2f..78823e890a 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -9,9 +9,8 @@ import com.r3corda.core.crypto.SecureHash import com.r3corda.core.days import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.ServiceHub -import com.r3corda.core.node.services.ServiceType -import com.r3corda.core.node.services.TransactionStorage -import com.r3corda.core.node.services.Vault +import com.r3corda.core.node.services.* +import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.WireTransaction import com.r3corda.core.utilities.DUMMY_NOTARY @@ -121,7 +120,7 @@ class TwoPartyTradeProtocolTests { 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null).second insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey) - val aliceFuture = runBuyerAndSeller("alice's paper".outputStateAndRef()).second + val aliceFuture = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerFuture // Everything is on this thread so we can now step through the protocol one step at a time. // Seller Alice already sent a message to Buyer Bob. Pump once: @@ -187,11 +186,14 @@ class TwoPartyTradeProtocolTests { advertisedServices: Set, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, keyPair) { // That constructs the storage service object in a customised way ... - override fun constructStorageService(attachments: NodeAttachmentService, - transactionStorage: TransactionStorage, - keyPair: KeyPair, - identity: Party): StorageServiceImpl { - return StorageServiceImpl(attachments, RecordingTransactionStorage(transactionStorage), keyPair, identity) + override fun constructStorageService( + attachments: NodeAttachmentService, + transactionStorage: TransactionStorage, + stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage, + keyPair: KeyPair, + identity: Party + ): StorageServiceImpl { + return StorageServiceImpl(attachments, RecordingTransactionStorage(transactionStorage), stateMachineRecordedTransactionMappingStorage, keyPair, identity) } } } @@ -288,6 +290,69 @@ class TwoPartyTradeProtocolTests { } } + @Test + fun `track() works`() { + + notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY) + aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name, ALICE_KEY) + bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name, BOB_KEY) + + ledger(aliceNode.services) { + + // Insert a prospectus type attachment into the commercial paper transaction. + val stream = ByteArrayOutputStream() + JarOutputStream(stream).use { + it.putNextEntry(ZipEntry("Prospectus.txt")) + it.write("Our commercial paper is top notch stuff".toByteArray()) + it.closeEntry() + } + val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray())) + + val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public).second + val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode.services) + val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey, + 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID).second + val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey) + + net.runNetwork() // Clear network map registration messages + + val aliceTxStream = aliceNode.storage.validatedTransactions.track().second + val aliceTxMappings = aliceNode.storage.stateMachineRecordedTransactionMapping.track().second + val (bobResult, aliceResult, bobSmId, aliceSmId) = runBuyerAndSeller("alice's paper".outputStateAndRef()) + + net.runNetwork() + + // We need to declare this here, if we do it inside [expectEvents] kotlin throws an internal compiler error(!). + val aliceTxExpectations = sequence( + expect { tx: SignedTransaction -> + require(tx.id == bobsFakeCash[0].id) + }, + expect { tx: SignedTransaction -> + require(tx.id == bobsFakeCash[2].id) + }, + expect { tx: SignedTransaction -> + require(tx.id == bobsFakeCash[1].id) + } + ) + aliceTxStream.expectEvents { aliceTxExpectations } + val aliceMappingExpectations = sequence( + expect { mapping: StateMachineTransactionMapping -> + require(mapping.stateMachineRunId == aliceSmId) + require(mapping.transactionId == bobsFakeCash[0].id) + }, + expect { mapping: StateMachineTransactionMapping -> + require(mapping.stateMachineRunId == aliceSmId) + require(mapping.transactionId == bobsFakeCash[2].id) + }, + expect { mapping: StateMachineTransactionMapping -> + require(mapping.stateMachineRunId == aliceSmId) + require(mapping.transactionId == bobsFakeCash[1].id) + } + ) + aliceTxMappings.expectEvents { aliceMappingExpectations } + } + } + @Test fun `dependency with error on buyer side`() { ledger { @@ -302,15 +367,21 @@ class TwoPartyTradeProtocolTests { } } + data class RunResult( + val buyerFuture: Future, + val sellerFuture: Future, + val buyerSmId: StateMachineRunId, + val sellerSmId: StateMachineRunId + ) - private fun runBuyerAndSeller(assetToSell: StateAndRef) : Pair, Future> { + private fun runBuyerAndSeller(assetToSell: StateAndRef): RunResult { val buyer = Buyer(aliceNode.info.identity, notaryNode.info.identity, 1000.DOLLARS, CommercialPaper.State::class.java) val seller = Seller(bobNode.info.identity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY) connectProtocols(buyer, seller) // We start the Buyer first, as the Seller sends the first message - val buyerResult = bobNode.smm.add("$TOPIC.buyer", buyer).resultFuture - val sellerResult = aliceNode.smm.add("$TOPIC.seller", seller).resultFuture - return Pair(buyerResult, sellerResult) + val buyerPsm = bobNode.smm.add("$TOPIC.buyer", buyer) + val sellerPsm = aliceNode.smm.add("$TOPIC.seller", seller) + return RunResult(buyerPsm.resultFuture, sellerPsm.resultFuture, buyerPsm.id, sellerPsm.id) } private fun LedgerDSL.runWithError( diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt index 5199290d7c..509d8fe971 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt @@ -6,6 +6,7 @@ import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.node.services.* import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogicRefFactory +import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.MonitoringService @@ -56,8 +57,7 @@ open class MockServiceHubInternal( private val txStorageService: TxWritableStorageService get() = storage ?: throw UnsupportedOperationException() - override fun recordTransactions(txs: Iterable) = - recordTransactionsInternal(txStorageService, txs) + override fun recordTransactions(txs: Iterable) = recordTransactionsInternal(txStorageService, txs) lateinit var smm: StateMachineManager diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt index 3565e42551..66e7fdbb5a 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt @@ -6,6 +6,7 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.SecureHash import com.r3corda.core.days import com.r3corda.core.node.ServiceHub +import com.r3corda.core.node.recordTransactions import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogicRef import com.r3corda.core.protocols.ProtocolLogicRefFactory diff --git a/node/src/test/kotlin/com/r3corda/node/services/VaultWithCashTest.kt b/node/src/test/kotlin/com/r3corda/node/services/VaultWithCashTest.kt index 4e85b0dc42..957ab779fb 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/VaultWithCashTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/VaultWithCashTest.kt @@ -5,6 +5,7 @@ import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER import com.r3corda.contracts.asset.cashBalances import com.r3corda.contracts.testing.fillWithSomeTestCash import com.r3corda.core.contracts.* +import com.r3corda.core.node.recordTransactions import com.r3corda.core.node.services.VaultService import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.utilities.DUMMY_NOTARY diff --git a/src/main/kotlin/com/r3corda/simulation/TradeSimulation.kt b/src/main/kotlin/com/r3corda/simulation/TradeSimulation.kt index fda013f672..6831acb0a9 100644 --- a/src/main/kotlin/com/r3corda/simulation/TradeSimulation.kt +++ b/src/main/kotlin/com/r3corda/simulation/TradeSimulation.kt @@ -9,6 +9,7 @@ import com.r3corda.core.contracts.DOLLARS import com.r3corda.core.contracts.OwnableState import com.r3corda.core.contracts.`issued by` import com.r3corda.core.days +import com.r3corda.core.node.recordTransactions import com.r3corda.core.seconds import com.r3corda.core.transactions.SignedTransaction import com.r3corda.protocols.TwoPartyTradeProtocol diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/TestDSL.kt b/test-utils/src/main/kotlin/com/r3corda/testing/TestDSL.kt index 7afa7f9e25..529b406819 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/TestDSL.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/TestDSL.kt @@ -3,6 +3,7 @@ package com.r3corda.testing import com.r3corda.core.contracts.* import com.r3corda.core.crypto.* import com.r3corda.core.node.ServiceHub +import com.r3corda.core.node.recordTransactions import com.r3corda.core.serialization.serialize import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.TransactionBuilder diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockServices.kt index fea06b139b..262a9d59bc 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockServices.kt @@ -10,9 +10,11 @@ import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.* import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.utilities.DUMMY_NOTARY +import com.r3corda.node.services.persistence.InMemoryStateMachineRecordedTransactionMappingStorage import com.r3corda.testing.MEGA_CORP import com.r3corda.testing.MINI_CORP import rx.Observable @@ -42,6 +44,9 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub { } override fun recordTransactions(txs: Iterable) { + txs.forEach { + storageService.stateMachineRecordedTransactionMapping.addMapping(StateMachineRunId.createRandom(), it.id) + } for (stx in txs) { storageService.validatedTransactions.addTransaction(stx) } @@ -116,6 +121,10 @@ class MockAttachmentStorage : AttachmentStorage { } } +class MockStateMachineRecordedTransactionMappingStorage( + val storage: StateMachineRecordedTransactionMappingStorage = InMemoryStateMachineRecordedTransactionMappingStorage() +) : StateMachineRecordedTransactionMappingStorage by storage + open class MockTransactionStorage : TransactionStorage { override fun track(): Pair, Observable> { return Pair(txns.values.toList(), _updatesPublisher) @@ -142,7 +151,8 @@ open class MockTransactionStorage : TransactionStorage { class MockStorageService(override val attachments: AttachmentStorage = MockAttachmentStorage(), override val validatedTransactions: TransactionStorage = MockTransactionStorage(), override val myLegalIdentityKey: KeyPair = generateKeyPair(), - override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)) + override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public), + override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage = MockStateMachineRecordedTransactionMappingStorage()) : SingletonSerializeAsToken(), TxWritableStorageService /**