diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt index 5f0247b791..5c22ec20f7 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt @@ -71,6 +71,8 @@ class Wallet(val states: List>) { * wallet service vends immutable snapshots of the current wallet for working with: if you build a transaction based * on a wallet that isn't current, be aware that it may end up being invalid if the states that were used have been * consumed by someone else first! + * + * Note that transactions we've seen are held by the storage service, not the wallet. */ interface WalletService { /** diff --git a/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt index 2e09d2c348..3ed8ceb66c 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt @@ -33,10 +33,16 @@ class ResolveTransactionsProtocol(private val txHashes: Set, private var stx: SignedTransaction? = null private var wtx: WireTransaction? = null + /** + * Resolve the full history of a transaction and verify it with its dependencies. + */ constructor(stx: SignedTransaction, otherSide: Party) : this(stx.tx, otherSide) { this.stx = stx } + /** + * Resolve the full history of a transaction and verify it with its dependencies. + */ constructor(wtx: WireTransaction, otherSide: Party) : this(dependencyIDs(wtx), otherSide) { this.wtx = wtx } diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index cf494cc40d..e8a14d8a70 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -169,7 +169,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, // This object doesn't need to be referenced from this class because it registers handlers on the network // service and so that keeps it from being collected. - DataVendingService(net, storage, services.networkMapCache) + DataVendingService(net, services) NotaryChangeService(net, smm, services.networkMapCache) buildAdvertisedServices() diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt index 1468897f9a..a3ef3f630e 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt @@ -1,14 +1,19 @@ package com.r3corda.node.services.persistence import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.crypto.Party +import com.r3corda.core.failure import com.r3corda.core.messaging.MessagingService -import com.r3corda.core.node.services.NetworkMapCache -import com.r3corda.core.node.services.StorageService +import com.r3corda.core.serialization.serialize +import com.r3corda.core.success import com.r3corda.core.utilities.loggerFor import com.r3corda.node.services.api.AbstractNodeService +import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.protocols.FetchAttachmentsProtocol import com.r3corda.protocols.FetchDataProtocol import com.r3corda.protocols.FetchTransactionsProtocol +import com.r3corda.protocols.PartyRequestMessage +import com.r3corda.protocols.ResolveTransactionsProtocol import java.io.InputStream import javax.annotation.concurrent.ThreadSafe @@ -25,11 +30,22 @@ import javax.annotation.concurrent.ThreadSafe * Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null. */ @ThreadSafe -class DataVendingService(net: MessagingService, private val storage: StorageService, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) { +// TODO: I don't like that this needs ServiceHubInternal, but passing in a state machine breaks MockServices because +// the state machine isn't set when this is constructed. [NodeSchedulerService] has the same problem, and both +// should be fixed at the same time. +class DataVendingService(net: MessagingService, private val services: ServiceHubInternal) : AbstractNodeService(net, services.networkMapCache) { companion object { val logger = loggerFor() + + /** Topic for messages notifying a node of a new transaction */ + val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx" } + val storage = services.storageService + + data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage + data class NotifyTxResponseMessage(val accepted: Boolean) + init { addMessageHandler(FetchTransactionsProtocol.TOPIC, { req: FetchDataProtocol.Request -> handleTXRequest(req) }, @@ -39,6 +55,30 @@ class DataVendingService(net: MessagingService, private val storage: StorageServ { req: FetchDataProtocol.Request -> handleAttachmentRequest(req) }, { message, e -> logger.error("Failure processing data vending request.", e) } ) + addMessageHandler(NOTIFY_TX_PROTOCOL_TOPIC, + { req: NotifyTxRequestMessage -> handleTXNotification(req) }, + { message, e -> logger.error("Failure processing data vending request.", e) } + ) + } + + private fun handleTXNotification(req: NotifyTxRequestMessage): Unit { + // TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction + // includes us in any outside that list. Potentially just if it includes any outside that list at all. + + // TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming + // cash without from unknown parties? + + services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty)) + .success { + services.recordTransactions(req.tx) + val resp = NotifyTxResponseMessage(true) + val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits) + net.send(msg, req.getReplyTo(services.networkMapCache)) + }.failure { + val resp = NotifyTxResponseMessage(false) + val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits) + net.send(msg, req.getReplyTo(services.networkMapCache)) + } } private fun handleTXRequest(req: FetchDataProtocol.Request): List { diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt index 242d41dbf2..9b6203ec49 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt @@ -68,7 +68,7 @@ open class MockServices( if (net != null && storage != null) { // Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener // on the networking service, so that will keep it from being collected. - DataVendingService(net, storage, networkMapCache) + DataVendingService(net, this) } } } diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt new file mode 100644 index 0000000000..61c45fd9ab --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt @@ -0,0 +1,105 @@ +package com.r3corda.node.services.persistence + +import co.paralleluniverse.fibers.Suspendable +import com.r3corda.contracts.asset.Cash +import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER +import com.r3corda.core.contracts.* +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.random63BitValue +import com.r3corda.core.testing.DUMMY_NOTARY +import com.r3corda.core.testing.MEGA_CORP +import com.r3corda.core.testing.MEGA_CORP_KEY +import com.r3corda.core.utilities.BriefLogFormatter +import com.r3corda.node.internal.testing.MockNetwork +import org.junit.Before +import org.junit.Test +import java.util.concurrent.TimeUnit +import javax.annotation.Signed +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +/** + * Tests for the data vending service. + */ +class DataVendingServiceTests { + lateinit var network: MockNetwork + + init { + BriefLogFormatter.init() + } + + @Before + fun setup() { + network = MockNetwork() + } + + class NotifyPSM(val server: NodeInfo, val tx: SignedTransaction) + : ProtocolLogic() { + override val topic: String get() = DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC + @Suspendable + override fun call(): Boolean { + val sessionID = random63BitValue() + val req = DataVendingService.NotifyTxRequestMessage(tx, serviceHub.storageService.myLegalIdentity, sessionID) + return sendAndReceive(server.identity, 0, sessionID, req).validate { it.accepted } + } + } + + @Test + fun `notify of transaction`() { + val (walletServiceNode, registerNode) = network.createTwoNodes() + val beneficiary = walletServiceNode.services.storageService.myLegalIdentityKey.public + val deposit = registerNode.services.storageService.myLegalIdentity.ref(1) + network.runNetwork() + + // Generate an issuance transaction + val ptx = TransactionType.General.Builder() + Cash().generateIssue(ptx, Amount(100, Issued(deposit, USD)), beneficiary, DUMMY_NOTARY) + + // Complete the cash transaction, and then manually relay it + ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) + val tx = ptx.toSignedTransaction() + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + val notifyPsm = registerNode.smm.add(DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx)) + + // Check it was accepted + network.runNetwork() + assertTrue(notifyPsm.get(1, TimeUnit.SECONDS)) + + // Check the transaction is in the receiving node + val actual = walletServiceNode.services.walletService.currentWallet.states.single() + val expected = tx.tx.outRef(0) + + assertEquals(expected, actual) + } + + /** + * Test that invalid transactions are rejected. + */ + @Test + fun `notify failure`() { + val (walletServiceNode, registerNode) = network.createTwoNodes() + val beneficiary = walletServiceNode.services.storageService.myLegalIdentityKey.public + val deposit = MEGA_CORP.ref(1) + network.runNetwork() + + // Generate an issuance transaction + val ptx = TransactionType.General.Builder() + Cash().generateIssue(ptx, Amount(100, Issued(deposit, USD)), beneficiary, DUMMY_NOTARY) + + // The transaction tries issuing MEGA_CORP cash, but we aren't the issuer, so it's invalid + ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) + val tx = ptx.toSignedTransaction(false) + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + val notifyPsm = registerNode.smm.add(DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx)) + + // Check it was accepted + network.runNetwork() + assertFalse(notifyPsm.get(1, TimeUnit.SECONDS)) + + // Check the transaction is not in the receiving node + assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) + } +} \ No newline at end of file