mirror of
synced 2025-03-23 04:25:19 +00:00
Add wallet service for notifying remote nodes of transactions
This commit is contained in:
@ -71,6 +71,8 @@ class Wallet(val states: List<StateAndRef<ContractState>>) {
* wallet service vends immutable snapshots of the current wallet for working with: if you build a transaction based
* on a wallet that isn't current, be aware that it may end up being invalid if the states that were used have been
* consumed by someone else first!
* Note that transactions we've seen are held by the storage service, not the wallet.
interface WalletService {
@ -33,10 +33,16 @@ class ResolveTransactionsProtocol(private val txHashes: Set<SecureHash>,
private var stx: SignedTransaction? = null
private var wtx: WireTransaction? = null
* Resolve the full history of a transaction and verify it with its dependencies.
constructor(stx: SignedTransaction, otherSide: Party) : this(stx.tx, otherSide) {
this.stx = stx
* Resolve the full history of a transaction and verify it with its dependencies.
constructor(wtx: WireTransaction, otherSide: Party) : this(dependencyIDs(wtx), otherSide) {
this.wtx = wtx
@ -169,7 +169,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
DataVendingService(net, storage, services.networkMapCache)
DataVendingService(net, services)
NotaryChangeService(net, smm, services.networkMapCache)
@ -1,14 +1,19 @@
package com.r3corda.node.services.persistence
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.failure
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.StorageService
import com.r3corda.core.serialization.serialize
import com.r3corda.core.success
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.FetchAttachmentsProtocol
import com.r3corda.protocols.FetchDataProtocol
import com.r3corda.protocols.FetchTransactionsProtocol
import com.r3corda.protocols.PartyRequestMessage
import com.r3corda.protocols.ResolveTransactionsProtocol
import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe
@ -25,11 +30,22 @@ import javax.annotation.concurrent.ThreadSafe
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
class DataVendingService(net: MessagingService, private val storage: StorageService, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) {
// TODO: I don't like that this needs ServiceHubInternal, but passing in a state machine breaks MockServices because
// the state machine isn't set when this is constructed. [NodeSchedulerService] has the same problem, and both
// should be fixed at the same time.
class DataVendingService(net: MessagingService, private val services: ServiceHubInternal) : AbstractNodeService(net, services.networkMapCache) {
companion object {
val logger = loggerFor<DataVendingService>()
/** Topic for messages notifying a node of a new transaction */
val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx"
val storage = services.storageService
data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
data class NotifyTxResponseMessage(val accepted: Boolean)
init {
{ req: FetchDataProtocol.Request -> handleTXRequest(req) },
@ -39,6 +55,30 @@ class DataVendingService(net: MessagingService, private val storage: StorageServ
{ req: FetchDataProtocol.Request -> handleAttachmentRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
{ req: NotifyTxRequestMessage -> handleTXNotification(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
private fun handleTXNotification(req: NotifyTxRequestMessage): Unit {
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure {
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {
@ -68,7 +68,7 @@ open class MockServices(
if (net != null && storage != null) {
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
// on the networking service, so that will keep it from being collected.
DataVendingService(net, storage, networkMapCache)
DataVendingService(net, this)
@ -0,0 +1,105 @@
package com.r3corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.contracts.asset.Cash
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
import com.r3corda.core.contracts.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.MEGA_CORP
import com.r3corda.core.testing.MEGA_CORP_KEY
import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.node.internal.testing.MockNetwork
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit
import javax.annotation.Signed
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
* Tests for the data vending service.
class DataVendingServiceTests {
lateinit var network: MockNetwork
init {
fun setup() {
network = MockNetwork()
class NotifyPSM(val server: NodeInfo, val tx: SignedTransaction)
: ProtocolLogic<Boolean>() {
override val topic: String get() = DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC
override fun call(): Boolean {
val sessionID = random63BitValue()
val req = DataVendingService.NotifyTxRequestMessage(tx, serviceHub.storageService.myLegalIdentity, sessionID)
return sendAndReceive<DataVendingService.NotifyTxResponseMessage>(server.identity, 0, sessionID, req).validate { it.accepted }
fun `notify of transaction`() {
val (walletServiceNode, registerNode) = network.createTwoNodes()
val beneficiary = walletServiceNode.services.storageService.myLegalIdentityKey.public
val deposit = registerNode.services.storageService.myLegalIdentity.ref(1)
// Generate an issuance transaction
val ptx = TransactionType.General.Builder()
Cash().generateIssue(ptx, Amount(100, Issued(deposit, USD)), beneficiary, DUMMY_NOTARY)
// Complete the cash transaction, and then manually relay it
val tx = ptx.toSignedTransaction()
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
val notifyPsm = registerNode.smm.add(DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx))
// Check it was accepted
assertTrue(notifyPsm.get(1, TimeUnit.SECONDS))
// Check the transaction is in the receiving node
val actual = walletServiceNode.services.walletService.currentWallet.states.single()
val expected = tx.tx.outRef<Cash.State>(0)
assertEquals(expected, actual)
* Test that invalid transactions are rejected.
fun `notify failure`() {
val (walletServiceNode, registerNode) = network.createTwoNodes()
val beneficiary = walletServiceNode.services.storageService.myLegalIdentityKey.public
val deposit = MEGA_CORP.ref(1)
// Generate an issuance transaction
val ptx = TransactionType.General.Builder()
Cash().generateIssue(ptx, Amount(100, Issued(deposit, USD)), beneficiary, DUMMY_NOTARY)
// The transaction tries issuing MEGA_CORP cash, but we aren't the issuer, so it's invalid
val tx = ptx.toSignedTransaction(false)
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
val notifyPsm = registerNode.smm.add(DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx))
// Check it was accepted
assertFalse(notifyPsm.get(1, TimeUnit.SECONDS))
// Check the transaction is not in the receiving node
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
Reference in New Issue
Block a user