Merged in rnicoll-notify-tx-protocol (pull request #259)

Add NotifyTxProtocol
This commit is contained in:
Ross Nicoll 2016-08-03 17:12:55 +01:00
commit 1b8b06a2a4
2 changed files with 50 additions and 28 deletions

View File

@ -1,10 +1,17 @@
package com.r3corda.node.services.persistence
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.failure
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.success
import com.r3corda.core.utilities.loggerFor
@ -42,12 +49,33 @@ object DataVending {
/** Topic for messages notifying a node of a new transaction */
val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx"
fun notify(net: MessagingService,
myIdentity: Party,
recipient: NodeInfo,
transaction: SignedTransaction): ListenableFuture<Unit> {
val future = SettableFuture.create<Unit>()
val sessionID = random63BitValue()
net.runOnNextMessage(NOTIFY_TX_PROTOCOL_TOPIC, sessionID) { msg ->
// TODO: Can we improve/simplify the response from the remote node?
val data = msg.data.deserialize<NotifyTxResponseMessage>()
if (data.accepted) {
future.set(Unit)
} else {
future.setException(TransactionRejectedError("Transaction ${transaction} rejected by remote party ${recipient.identity}"))
}
}
val msg = NotifyTxRequestMessage(transaction, myIdentity, sessionID)
net.send(net.createMessage(TopicSession(NOTIFY_TX_PROTOCOL_TOPIC, 0), msg.serialize().bits), recipient.address)
return future
}
}
val storage = services.storageService
data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
data class NotifyTxResponseMessage(val accepted: Boolean)
class TransactionRejectedError(msg: String) : Exception(msg)
init {
addMessageHandler(FetchTransactionsProtocol.TOPIC,
@ -71,16 +99,20 @@ object DataVending {
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
try {
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure {
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure { throwable ->
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
} catch(t: Exception) {
// Already handled by the hooks on the future, ignore
}
}

View File

@ -1,12 +1,10 @@
package com.r3corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.contracts.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.contracts.Amount
import com.r3corda.core.contracts.Issued
import com.r3corda.core.contracts.TransactionType
import com.r3corda.core.contracts.USD
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.MEGA_CORP
import com.r3corda.core.utilities.BriefLogFormatter
@ -15,7 +13,7 @@ import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
/**
@ -33,17 +31,6 @@ class DataVendingServiceTests {
network = MockNetwork()
}
class NotifyPSM(val server: NodeInfo, val tx: SignedTransaction)
: ProtocolLogic<Boolean>() {
override val topic: String get() = DataVending.Service.NOTIFY_TX_PROTOCOL_TOPIC
@Suspendable
override fun call(): Boolean {
val sessionID = random63BitValue()
val req = DataVending.Service.NotifyTxRequestMessage(tx, serviceHub.storageService.myLegalIdentity, sessionID)
return sendAndReceive<DataVending.Service.NotifyTxResponseMessage>(server.identity, DEFAULT_SESSION_ID, sessionID, req).validate { it.accepted }
}
}
@Test
fun `notify of transaction`() {
val (walletServiceNode, registerNode) = network.createTwoNodes()
@ -59,14 +46,15 @@ class DataVendingServiceTests {
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
val tx = ptx.toSignedTransaction()
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
val notifyPsm = registerNode.smm.add(DataVending.Service.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx))
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
walletServiceNode.info, tx)
// Check it was accepted
network.runNetwork()
assertTrue(notifyPsm.get(1, TimeUnit.SECONDS))
notifyPsm.get(1, TimeUnit.SECONDS)
// Check the transaction is in the receiving node
val actual = walletServiceNode.services.walletService.currentWallet.states.single()
val actual = walletServiceNode.services.walletService.currentWallet.states.singleOrNull()
val expected = tx.tx.outRef<Cash.State>(0)
assertEquals(expected, actual)
@ -90,11 +78,13 @@ class DataVendingServiceTests {
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
val tx = ptx.toSignedTransaction(false)
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
val notifyPsm = registerNode.smm.add(DataVending.Service.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx))
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
walletServiceNode.info, tx)
// Check it was accepted
network.runNetwork()
assertFalse(notifyPsm.get(1, TimeUnit.SECONDS))
val ex = assertFailsWith<java.util.concurrent.ExecutionException> { notifyPsm.get(1, TimeUnit.SECONDS) }
assertTrue(ex.cause is DataVending.Service.TransactionRejectedError)
// Check the transaction is not in the receiving node
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)