Merged in rnicoll-wallet-notify-transaction (pull request #237)

Add wallet service for notifying remote nodes of transactions
This commit is contained in:
Ross Nicoll 2016-07-27 17:45:21 +01:00
commit 2a0066ae74
6 changed files with 158 additions and 5 deletions

View File

@ -71,6 +71,8 @@ class Wallet(val states: List<StateAndRef<ContractState>>) {
* wallet service vends immutable snapshots of the current wallet for working with: if you build a transaction based * wallet service vends immutable snapshots of the current wallet for working with: if you build a transaction based
* on a wallet that isn't current, be aware that it may end up being invalid if the states that were used have been * on a wallet that isn't current, be aware that it may end up being invalid if the states that were used have been
* consumed by someone else first! * consumed by someone else first!
*
* Note that transactions we've seen are held by the storage service, not the wallet.
*/ */
interface WalletService { interface WalletService {
/** /**

View File

@ -33,10 +33,16 @@ class ResolveTransactionsProtocol(private val txHashes: Set<SecureHash>,
private var stx: SignedTransaction? = null private var stx: SignedTransaction? = null
private var wtx: WireTransaction? = null private var wtx: WireTransaction? = null
/**
* Resolve the full history of a transaction and verify it with its dependencies.
*/
constructor(stx: SignedTransaction, otherSide: Party) : this(stx.tx, otherSide) { constructor(stx: SignedTransaction, otherSide: Party) : this(stx.tx, otherSide) {
this.stx = stx this.stx = stx
} }
/**
* Resolve the full history of a transaction and verify it with its dependencies.
*/
constructor(wtx: WireTransaction, otherSide: Party) : this(dependencyIDs(wtx), otherSide) { constructor(wtx: WireTransaction, otherSide: Party) : this(dependencyIDs(wtx), otherSide) {
this.wtx = wtx this.wtx = wtx
} }

View File

@ -169,7 +169,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// This object doesn't need to be referenced from this class because it registers handlers on the network // This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected. // service and so that keeps it from being collected.
DataVendingService(net, storage, services.networkMapCache) DataVendingService(net, services)
NotaryChangeService(net, smm, services.networkMapCache) NotaryChangeService(net, smm, services.networkMapCache)
buildAdvertisedServices() buildAdvertisedServices()

View File

@ -1,14 +1,19 @@
package com.r3corda.node.services.persistence package com.r3corda.node.services.persistence
import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.failure
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.serialization.serialize
import com.r3corda.core.node.services.StorageService import com.r3corda.core.success
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.FetchAttachmentsProtocol import com.r3corda.protocols.FetchAttachmentsProtocol
import com.r3corda.protocols.FetchDataProtocol import com.r3corda.protocols.FetchDataProtocol
import com.r3corda.protocols.FetchTransactionsProtocol import com.r3corda.protocols.FetchTransactionsProtocol
import com.r3corda.protocols.PartyRequestMessage
import com.r3corda.protocols.ResolveTransactionsProtocol
import java.io.InputStream import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
@ -25,11 +30,22 @@ import javax.annotation.concurrent.ThreadSafe
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null. * Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
*/ */
@ThreadSafe @ThreadSafe
class DataVendingService(net: MessagingService, private val storage: StorageService, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) { // TODO: I don't like that this needs ServiceHubInternal, but passing in a state machine breaks MockServices because
// the state machine isn't set when this is constructed. [NodeSchedulerService] has the same problem, and both
// should be fixed at the same time.
class DataVendingService(net: MessagingService, private val services: ServiceHubInternal) : AbstractNodeService(net, services.networkMapCache) {
companion object { companion object {
val logger = loggerFor<DataVendingService>() val logger = loggerFor<DataVendingService>()
/** Topic for messages notifying a node of a new transaction */
val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx"
} }
val storage = services.storageService
data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
data class NotifyTxResponseMessage(val accepted: Boolean)
init { init {
addMessageHandler(FetchTransactionsProtocol.TOPIC, addMessageHandler(FetchTransactionsProtocol.TOPIC,
{ req: FetchDataProtocol.Request -> handleTXRequest(req) }, { req: FetchDataProtocol.Request -> handleTXRequest(req) },
@ -39,6 +55,30 @@ class DataVendingService(net: MessagingService, private val storage: StorageServ
{ req: FetchDataProtocol.Request -> handleAttachmentRequest(req) }, { req: FetchDataProtocol.Request -> handleAttachmentRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) } { message, e -> logger.error("Failure processing data vending request.", e) }
) )
addMessageHandler(NOTIFY_TX_PROTOCOL_TOPIC,
{ req: NotifyTxRequestMessage -> handleTXNotification(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
}
private fun handleTXNotification(req: NotifyTxRequestMessage): Unit {
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure {
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
} }
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> { private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {

View File

@ -68,7 +68,7 @@ open class MockServices(
if (net != null && storage != null) { if (net != null && storage != null) {
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener // Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
// on the networking service, so that will keep it from being collected. // on the networking service, so that will keep it from being collected.
DataVendingService(net, storage, networkMapCache) DataVendingService(net, this)
} }
} }
} }

View File

@ -0,0 +1,105 @@
package com.r3corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.contracts.asset.Cash
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
import com.r3corda.core.contracts.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.MEGA_CORP
import com.r3corda.core.testing.MEGA_CORP_KEY
import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.node.internal.testing.MockNetwork
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit
import javax.annotation.Signed
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
/**
* Tests for the data vending service.
*/
class DataVendingServiceTests {
lateinit var network: MockNetwork
init {
BriefLogFormatter.init()
}
@Before
fun setup() {
network = MockNetwork()
}
class NotifyPSM(val server: NodeInfo, val tx: SignedTransaction)
: ProtocolLogic<Boolean>() {
override val topic: String get() = DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC
@Suspendable
override fun call(): Boolean {
val sessionID = random63BitValue()
val req = DataVendingService.NotifyTxRequestMessage(tx, serviceHub.storageService.myLegalIdentity, sessionID)
return sendAndReceive<DataVendingService.NotifyTxResponseMessage>(server.identity, 0, sessionID, req).validate { it.accepted }
}
}
@Test
fun `notify of transaction`() {
val (walletServiceNode, registerNode) = network.createTwoNodes()
val beneficiary = walletServiceNode.services.storageService.myLegalIdentityKey.public
val deposit = registerNode.services.storageService.myLegalIdentity.ref(1)
network.runNetwork()
// Generate an issuance transaction
val ptx = TransactionType.General.Builder()
Cash().generateIssue(ptx, Amount(100, Issued(deposit, USD)), beneficiary, DUMMY_NOTARY)
// Complete the cash transaction, and then manually relay it
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
val tx = ptx.toSignedTransaction()
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
val notifyPsm = registerNode.smm.add(DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx))
// Check it was accepted
network.runNetwork()
assertTrue(notifyPsm.get(1, TimeUnit.SECONDS))
// Check the transaction is in the receiving node
val actual = walletServiceNode.services.walletService.currentWallet.states.single()
val expected = tx.tx.outRef<Cash.State>(0)
assertEquals(expected, actual)
}
/**
* Test that invalid transactions are rejected.
*/
@Test
fun `notify failure`() {
val (walletServiceNode, registerNode) = network.createTwoNodes()
val beneficiary = walletServiceNode.services.storageService.myLegalIdentityKey.public
val deposit = MEGA_CORP.ref(1)
network.runNetwork()
// Generate an issuance transaction
val ptx = TransactionType.General.Builder()
Cash().generateIssue(ptx, Amount(100, Issued(deposit, USD)), beneficiary, DUMMY_NOTARY)
// The transaction tries issuing MEGA_CORP cash, but we aren't the issuer, so it's invalid
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
val tx = ptx.toSignedTransaction(false)
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
val notifyPsm = registerNode.smm.add(DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx))
// Check it was accepted
network.runNetwork()
assertFalse(notifyPsm.get(1, TimeUnit.SECONDS))
// Check the transaction is not in the receiving node
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
}
}