Merged in aslemmer-generic-protocol-start (pull request #465)

Aslemmer generic protocol start
This commit is contained in:
Andras Slemmer 2016-11-15 16:51:57 +00:00
commit 9078676521
36 changed files with 529 additions and 354 deletions

View File

@ -3,7 +3,10 @@ package net.corda.client
import net.corda.client.model.NodeMonitorModel
import net.corda.client.model.ProgressTrackingEvent
import net.corda.core.bufferUntilSubscribed
import net.corda.core.contracts.*
import net.corda.core.contracts.Amount
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.USD
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.ServiceInfo
@ -13,13 +16,15 @@ import net.corda.core.protocols.StateMachineRunId
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.node.driver.driver
import net.corda.node.internal.CordaRPCOpsImpl
import net.corda.node.services.User
import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.StateMachineUpdate
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.startProtocolPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.sequence
@ -43,7 +48,7 @@ class NodeMonitorModelTest {
lateinit var transactions: Observable<SignedTransaction>
lateinit var vaultUpdates: Observable<Vault.Update>
lateinit var networkMapUpdates: Observable<NetworkMapCache.MapChange>
lateinit var clientToService: Observer<ClientToServiceCommand>
lateinit var clientToService: Observer<CashCommand>
lateinit var newNode: (String) -> NodeInfo
@Before
@ -51,7 +56,7 @@ class NodeMonitorModelTest {
val driverStarted = CountDownLatch(1)
driverThread = thread {
driver {
val cashUser = User("user1", "test", permissions = setOf(CordaRPCOpsImpl.CASH_PERMISSION))
val cashUser = User("user1", "test", permissions = setOf(startProtocolPermission<CashProtocol>()))
val aliceNodeFuture = startNode("Alice", rpcUsers = listOf(cashUser))
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
@ -106,7 +111,7 @@ class NodeMonitorModelTest {
@Test
fun `cash issue works end to end`() {
clientToService.onNext(ClientToServiceCommand.IssueCash(
clientToService.onNext(CashCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.legalIdentity,
@ -131,14 +136,14 @@ class NodeMonitorModelTest {
@Test
fun `cash issue and move`() {
clientToService.onNext(ClientToServiceCommand.IssueCash(
clientToService.onNext(CashCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.legalIdentity,
notary = notaryNode.notaryIdentity
))
clientToService.onNext(ClientToServiceCommand.PayCash(
clientToService.onNext(CashCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.legalIdentity
))

View File

@ -5,7 +5,7 @@ import net.corda.core.contracts.*
import net.corda.core.crypto.Party
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.TransactionBuilder
import java.time.Instant
import net.corda.protocols.CashCommand
/**
* [Generator]s for incoming/outgoing events to/from the [WalletMonitorService]. Internally it keeps track of owned
@ -65,7 +65,7 @@ class EventGenerator(
val issueCashGenerator =
amountGenerator.combine(partyGenerator, issueRefGenerator) { amount, to, issueRef ->
ClientToServiceCommand.IssueCash(
CashCommand.IssueCash(
amount,
issueRef,
to,
@ -77,7 +77,7 @@ class EventGenerator(
amountIssuedGenerator.combine(
partyGenerator
) { amountIssued, recipient ->
ClientToServiceCommand.PayCash(
CashCommand.PayCash(
amount = amountIssued,
recipient = recipient
)
@ -85,7 +85,7 @@ class EventGenerator(
val exitCashGenerator =
amountIssuedGenerator.map {
ClientToServiceCommand.ExitCash(
CashCommand.ExitCash(
it.withoutIssuer(),
it.token.issuer.reference
)

View File

@ -1,8 +1,8 @@
package net.corda.client.model
import com.google.common.net.HostAndPort
import javafx.beans.property.SimpleObjectProperty
import net.corda.client.CordaRPCClient
import net.corda.core.contracts.ClientToServiceCommand
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
@ -12,7 +12,9 @@ import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.messaging.CordaRPCOps
import net.corda.node.services.messaging.StateMachineInfo
import net.corda.node.services.messaging.StateMachineUpdate
import javafx.beans.property.SimpleObjectProperty
import net.corda.node.services.messaging.startProtocol
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import rx.Observable
import rx.subjects.PublishSubject
@ -46,8 +48,8 @@ class NodeMonitorModel {
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
val networkMap: Observable<NetworkMapCache.MapChange> = networkMapSubject
private val clientToServiceSource = PublishSubject.create<ClientToServiceCommand>()
val clientToService: PublishSubject<ClientToServiceCommand> = clientToServiceSource
private val clientToServiceSource = PublishSubject.create<CashCommand>()
val clientToService: PublishSubject<CashCommand> = clientToServiceSource
val proxyObservable = SimpleObjectProperty<CordaRPCOps?>()
@ -98,7 +100,7 @@ class NodeMonitorModel {
// Client -> Service
clientToServiceSource.subscribe {
proxy.executeCommand(it)
proxy.startProtocol(::CashProtocol, it)
}
proxyObservable.set(proxy)
}

View File

@ -69,7 +69,7 @@ class ClientRPCInfrastructureTests {
serverSession.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
producer = serverSession.createProducer()
val userService = object : RPCUserService {
override fun getUser(usename: String): User? = throw UnsupportedOperationException()
override fun getUser(username: String): User? = throw UnsupportedOperationException()
override val users: List<User> get() = throw UnsupportedOperationException()
}
val dispatcher = object : RPCDispatcher(TestOpsImpl(), userService) {

View File

@ -99,6 +99,21 @@ inline fun <T> SettableFuture<T>.catch(block: () -> T) {
}
}
fun <A> ListenableFuture<A>.toObservable(): Observable<A> {
return Observable.create { subscriber ->
then {
try {
subscriber.onNext(get())
subscriber.onCompleted()
} catch (e: ExecutionException) {
subscriber.onError(e.cause!!)
} catch (t: Throwable) {
subscriber.onError(t)
}
}
}
}
/** Allows you to write code like: Paths.get("someDir") / "subdir" / "filename" but using the Paths API to avoid platform separator problems. */
operator fun Path.div(other: String): Path = resolve(other)
fun Path.createDirectory(vararg attrs: FileAttribute<*>): Path = Files.createDirectory(this, *attrs)

View File

@ -1,49 +0,0 @@
package net.corda.core.contracts
import net.corda.core.crypto.Party
import net.corda.core.serialization.OpaqueBytes
import java.security.PublicKey
import java.util.*
/**
* A command from the monitoring client, to the node.
*
* @param id ID used to tag event(s) resulting from a command.
*/
sealed class ClientToServiceCommand(val id: UUID) {
/**
* Issue cash state objects.
*
* @param amount the amount of currency to issue on to the ledger.
* @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is
* to use the single byte "0x01" as a default.
* @param recipient the party to issue the cash to.
* @param notary the notary to use for this transaction.
* @param id the ID to be provided in events resulting from this request.
*/
class IssueCash(val amount: Amount<Currency>,
val issueRef: OpaqueBytes,
val recipient: Party,
val notary: Party,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
/**
* Pay cash to someone else.
*
* @param amount the amount of currency to issue on to the ledger.
* @param recipient the party to issue the cash to.
* @param id the ID to be provided in events resulting from this request.
*/
class PayCash(val amount: Amount<Issued<Currency>>, val recipient: Party,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
/**
* Exit cash from the ledger.
*
* @param amount the amount of currency to exit from the ledger.
* @param issueRef the reference previously specified on the issuance.
* @param id the ID to be provided in events resulting from this request.
*/
class ExitCash(val amount: Amount<Currency>, val issueRef: OpaqueBytes,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
}

View File

@ -1,12 +1,12 @@
package net.corda.core.node
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionState
import net.corda.core.messaging.MessagingService
import net.corda.core.node.services.*
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.transactions.SignedTransaction
import java.security.KeyPair
import java.time.Clock
@ -53,7 +53,7 @@ interface ServiceHub {
*
* @throws IllegalProtocolLogicException or IllegalArgumentException if there are problems with the [logicType] or [args].
*/
fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ListenableFuture<T>
fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ProtocolStateMachine<T>
/**
* Helper property to shorten code for fetching the Node's KeyPair associated with the

View File

@ -46,7 +46,7 @@ class ProtocolLogicRefFactory(private val protocolWhitelist: Map<String, Set<Str
return
}
// TODO: make this specific to the attachments in the [AppContext] by including [SecureHash] in whitelist check
require(protocolWhitelist[className]!!.contains(argClassName)) { "Args to $className must have types on the args whitelist: $argClassName" }
require(protocolWhitelist[className]!!.contains(argClassName)) { "Args to $className must have types on the args whitelist: $argClassName, but it has ${protocolWhitelist[className]}" }
}
/**

View File

@ -1,7 +1,6 @@
package net.corda.protocols
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.ClientToServiceCommand
import net.corda.core.crypto.Party
import net.corda.core.node.recordTransactions
import net.corda.core.protocols.ProtocolLogic
@ -13,19 +12,13 @@ import net.corda.core.transactions.SignedTransaction
* [FinalityProtocol].
*
* @param notarisedTransaction transaction which has been notarised (if needed) and is ready to notify nodes about.
* @param events information on the event(s) which triggered the transaction.
* @param participants a list of participants involved in the transaction.
* @return a list of participants who were successfully notified of the transaction.
*/
// TODO: Event needs to be replaced with something that's meaningful, but won't ever contain sensitive
// information (such as internal details of an account to take payment from). Suggest
// splitting ClientToServiceCommand into public and private parts, with only the public parts
// relayed here.
class BroadcastTransactionProtocol(val notarisedTransaction: SignedTransaction,
val events: Set<ClientToServiceCommand>,
val participants: Set<Party>) : ProtocolLogic<Unit>() {
data class NotifyTxRequest(val tx: SignedTransaction, val events: Set<ClientToServiceCommand>)
data class NotifyTxRequest(val tx: SignedTransaction)
@Suspendable
override fun call() {
@ -33,7 +26,7 @@ class BroadcastTransactionProtocol(val notarisedTransaction: SignedTransaction,
serviceHub.recordTransactions(notarisedTransaction)
// TODO: Messaging layer should handle this broadcast for us
val msg = NotifyTxRequest(notarisedTransaction, events)
val msg = NotifyTxRequest(notarisedTransaction)
participants.filter { it != serviceHub.myInfo.legalIdentity }.forEach { participant ->
send(participant, msg)
}

View File

@ -1,7 +1,6 @@
package net.corda.protocols
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.ClientToServiceCommand
import net.corda.core.crypto.Party
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.transactions.SignedTransaction
@ -12,16 +11,10 @@ import net.corda.core.utilities.ProgressTracker
* Finalise a transaction by notarising it, then recording it locally, and then sending it to all involved parties.
*
* @param transaction to commit.
* @param events information on the event(s) which triggered the transaction.
* @param participants a list of participants involved in the transaction.
* @return a list of participants who were successfully notified of the transaction.
*/
// TODO: Event needs to be replaced with something that's meaningful, but won't ever contain sensitive
// information (such as internal details of an account to take payment from). Suggest
// splitting ClientToServiceCommand into public and private parts, with only the public parts
// relayed here.
class FinalityProtocol(val transaction: SignedTransaction,
val events: Set<ClientToServiceCommand>,
val participants: Set<Party>,
override val progressTracker: ProgressTracker = tracker()): ProtocolLogic<Unit>() {
companion object {
@ -46,7 +39,7 @@ class FinalityProtocol(val transaction: SignedTransaction,
// Let everyone else know about the transaction
progressTracker.currentStep = BROADCASTING
subProtocol(BroadcastTransactionProtocol(notarisedTransaction, events, participants))
subProtocol(BroadcastTransactionProtocol(notarisedTransaction, participants))
}
private fun needsNotarySignature(stx: SignedTransaction) = stx.tx.notary != null && hasNoNotarySignature(stx)

View File

@ -19,7 +19,7 @@ class BroadcastTransactionProtocolTest {
class NotifyTxRequestMessageGenerator : Generator<NotifyTxRequest>(NotifyTxRequest::class.java) {
override fun generate(random: SourceOfRandomness, status: GenerationStatus): NotifyTxRequest {
return NotifyTxRequest(tx = SignedTransactionGenerator().generate(random, status), events = setOf())
return NotifyTxRequest(tx = SignedTransactionGenerator().generate(random, status))
}
}

View File

@ -49,7 +49,7 @@ class ResolveTransactionsProtocolTest {
fun `resolve from two hashes`() {
val (stx1, stx2) = makeTransactions()
val p = ResolveTransactionsProtocol(setOf(stx2.id), a.info.legalIdentity)
val future = b.services.startProtocol(p)
val future = b.services.startProtocol(p).resultFuture
net.runNetwork()
val results = future.get()
assertEquals(listOf(stx1.id, stx2.id), results.map { it.id })
@ -63,7 +63,7 @@ class ResolveTransactionsProtocolTest {
fun `dependency with an error`() {
val stx = makeTransactions(signFirstTX = false).second
val p = ResolveTransactionsProtocol(setOf(stx.id), a.info.legalIdentity)
val future = b.services.startProtocol(p)
val future = b.services.startProtocol(p).resultFuture
net.runNetwork()
assertFailsWith(SignatureException::class) {
rootCauseExceptions { future.get() }
@ -74,7 +74,7 @@ class ResolveTransactionsProtocolTest {
fun `resolve from a signed transaction`() {
val (stx1, stx2) = makeTransactions()
val p = ResolveTransactionsProtocol(stx2, a.info.legalIdentity)
val future = b.services.startProtocol(p)
val future = b.services.startProtocol(p).resultFuture
net.runNetwork()
future.get()
databaseTransaction(b.database) {
@ -101,7 +101,7 @@ class ResolveTransactionsProtocolTest {
}
val p = ResolveTransactionsProtocol(setOf(cursor.id), a.info.legalIdentity)
p.transactionCountLimit = 40
val future = b.services.startProtocol(p)
val future = b.services.startProtocol(p).resultFuture
net.runNetwork()
assertFailsWith<ResolveTransactionsProtocol.ExcessivelyLargeTransactionGraph> {
rootCauseExceptions { future.get() }
@ -129,7 +129,7 @@ class ResolveTransactionsProtocolTest {
}
val p = ResolveTransactionsProtocol(setOf(stx3.id), a.info.legalIdentity)
val future = b.services.startProtocol(p)
val future = b.services.startProtocol(p).resultFuture
net.runNetwork()
future.get()
}
@ -139,7 +139,7 @@ class ResolveTransactionsProtocolTest {
val id = a.services.storageService.attachments.importAttachment("Some test file".toByteArray().opaque().open())
val stx2 = makeTransactions(withAttachment = id).second
val p = ResolveTransactionsProtocol(stx2, a.info.legalIdentity)
val future = b.services.startProtocol(p)
val future = b.services.startProtocol(p).resultFuture
net.runNetwork()
future.get()
assertNotNull(b.services.storageService.attachments.openAttachment(id))

View File

@ -19,6 +19,14 @@ repositories {
}
}
sourceSets {
main {
resources {
srcDir "../../../config/dev"
}
}
}
dependencies {
compile project(':core')
compile project(':client')

View File

@ -1,15 +1,32 @@
package net.corda.docs
import com.google.common.net.HostAndPort
import net.corda.client.CordaRPCClient
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.USD
import net.corda.core.div
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.node.driver.driver
import net.corda.node.services.User
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.messaging.CordaRPCOps
import net.corda.node.services.messaging.startProtocol
import net.corda.node.services.startProtocolPermission
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import org.graphstream.graph.Edge
import org.graphstream.graph.Node
import org.graphstream.graph.implementations.SingleGraph
import org.graphstream.graph.implementations.MultiGraph
import rx.Observable
import java.nio.file.Paths
import java.util.concurrent.CompletableFuture
import java.util.*
import kotlin.concurrent.thread
/**
* This is example code for the Client RPC API tutorial. The START/END comments are important and used by the documentation!
@ -22,24 +39,32 @@ enum class PrintOrVisualise {
}
fun main(args: Array<String>) {
if (args.size < 2) {
throw IllegalArgumentException("Usage: <binary> <node address> [Print|Visualise]")
if (args.size < 1) {
throw IllegalArgumentException("Usage: <binary> [Print|Visualise]")
}
val nodeAddress = HostAndPort.fromString(args[0])
val printOrVisualise = PrintOrVisualise.valueOf(args[1])
val printOrVisualise = PrintOrVisualise.valueOf(args[0])
val baseDirectory = Paths.get("build/rpc-api-tutorial")
val user = User("user", "password", permissions = setOf(startProtocolPermission<CashProtocol>()))
driver(driverDirectory = baseDirectory) {
startNode("Notary", advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type)))
val node = startNode("Alice", rpcUsers = listOf(user)).get()
val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath = Paths.get("build/trader-demo/buyer/certificates")
override val certificatesPath = baseDirectory / "Alice" / "certificates"
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
}
// END 1
// START 2
val username = System.console().readLine("Enter username: ")
val password = String(System.console().readPassword("Enter password: "))
val client = CordaRPCClient(nodeAddress, sslConfig)
client.start(username, password)
val client = CordaRPCClient(FullNodeConfiguration(node.config).artemisAddress, sslConfig)
client.start("user", "password")
val proxy = client.proxy()
thread {
generateTransactions(proxy)
}
// END 2
// START 3
@ -55,12 +80,11 @@ fun main(args: Array<String>) {
println("EDGE ${input.txhash} ${transaction.id}")
}
}
CompletableFuture<Unit>().get() // block indefinitely
}
// END 4
// START 5
PrintOrVisualise.Visualise -> {
val graph = SingleGraph("transactions")
val graph = MultiGraph("transactions")
transactions.forEach { transaction ->
graph.addNode<Node>("${transaction.id}")
}
@ -78,5 +102,37 @@ fun main(args: Array<String>) {
graph.display()
}
}
waitForAllNodesToFinish()
}
}
// END 5
// START 6
fun generateTransactions(proxy: CordaRPCOps) {
var ownedQuantity = proxy.vaultAndUpdates().first.fold(0L) { sum, state ->
sum + (state.state.data as Cash.State).amount.quantity
}
val issueRef = OpaqueBytes.of(0)
val notary = proxy.networkMapUpdates().first.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
val me = proxy.nodeIdentity().legalIdentity
val meAndRef = PartyAndReference(me, issueRef)
while (true) {
Thread.sleep(1000)
val random = SplittableRandom()
val n = random.nextDouble()
if (ownedQuantity > 10000 && n > 0.8) {
val quantity = Math.abs(random.nextLong()) % 2000
proxy.startProtocol(::CashProtocol, CashCommand.ExitCash(Amount(quantity, USD), issueRef))
ownedQuantity -= quantity
} else if (ownedQuantity > 1000 && n < 0.7) {
val quantity = Math.abs(random.nextLong() % Math.min(ownedQuantity, 2000))
proxy.startProtocol(::CashProtocol, CashCommand.PayCash(Amount(quantity, Issued(meAndRef, USD)), me))
} else {
val quantity = Math.abs(random.nextLong() % 1000)
proxy.startProtocol(::CashProtocol, CashCommand.IssueCash(Amount(quantity, USD), issueRef, me, notary))
ownedQuantity += quantity
}
}
}
// END 6

View File

@ -1,31 +1,33 @@
.. _graphstream: http://graphstream-project.org/
Client RPC API
==============
Client RPC API Tutorial
=======================
In this tutorial we will build a simple command line utility that connects to a node and dumps the transaction graph to
the standard output. We will then put some simple visualisation on top. For an explanation on how the RPC works see
:doc:`clientrpc`.
In this tutorial we will build a simple command line utility that
connects to a node, creates some Cash transactions and meanwhile dumps
the transaction graph to the standard output. We will then put some
simple visualisation on top. For an explanation on how the RPC works
see :doc:`clientrpc`.
We start off by connecting to the node itself. For the purposes of the tutorial we will run the Trader demo on some
local port and connect to the Buyer side. We will pass in the address as a command line argument. To connect to the node
we also need to access the certificates of the node, we will access the node's ``certificates`` directory directly.
We start off by connecting to the node itself. For the purposes of the tutorial we will use the Driver to start up a notary and a node that issues/exits and moves Cash around for herself. To authenticate we will use the certificates of the nodes directly.
.. literalinclude:: example-code/src/main/kotlin/net.corda.docs/ClientRpcTutorial.kt
Note how we configure the node to create a user that has permission to start the CashProtocol.
.. literalinclude:: example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt
:language: kotlin
:start-after: START 1
:end-before: END 1
Now we can connect to the node itself using a valid RPC login. By default the user `user1` is available with password `test`.
Now we can connect to the node itself using a valid RPC login. We login using the configured user.
.. literalinclude:: example-code/src/main/kotlin/net.corda.docs/ClientRpcTutorial.kt
.. literalinclude:: example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt
:language: kotlin
:start-after: START 2
:end-before: END 2
``proxy`` now exposes the full RPC interface of the node:
We start generating transactions in a different thread (``generateTransactions`` to be defined later) using ``proxy``, which exposes the full RPC interface of the node:
.. literalinclude:: ../../node/src/main/kotlin/net.corda.node/services/messaging/CordaRPCOps.kt
.. literalinclude:: ../../node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCOps.kt
:language: kotlin
:start-after: interface CordaRPCOps
:end-before: }
@ -34,7 +36,7 @@ The one we need in order to dump the transaction graph is ``verifiedTransactions
RPC will return a list of transactions and an Observable stream. This is a general pattern, we query some data and the
node will return the current snapshot and future updates done to it.
.. literalinclude:: example-code/src/main/kotlin/net.corda.docs/ClientRpcTutorial.kt
.. literalinclude:: example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt
:language: kotlin
:start-after: START 3
:end-before: END 3
@ -43,41 +45,37 @@ The graph will be defined by nodes and edges between them. Each node represents
output-input relations. For now let's just print ``NODE <txhash>`` for the former and ``EDGE <txhash> <txhash>`` for the
latter.
.. literalinclude:: example-code/src/main/kotlin/net.corda.docs/ClientRpcTutorial.kt
.. literalinclude:: example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt
:language: kotlin
:start-after: START 4
:end-before: END 4
Now we can start the trader demo as per described in :doc:`running-the-demos`::
# Build the demo
./gradlew installDist
# Start the buyer
./build/install/r3prototyping/bin/trader-demo --role=BUYER
Now we just need to create the transactions themselves!
In another terminal we can connect to it with our client::
.. literalinclude:: example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt
:language: kotlin
:start-after: START 6
:end-before: END 6
# Connect to localhost:31337
./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial localhost:31337 Print
We utilise several RPC functions here to query things like the notaries in the node cluster or our own vault.
We should see some ``NODE``-s printed. This is because the buyer self-issues some cash for the demo.
Unless we ran the seller before we shouldn't see any ``EDGE``-s because the cash hasn't been spent yet.
Then in a loop we generate randomly either an Issue, a Pay or an Exit transaction.
In another terminal we can now start the seller::
The RPC we need to initiate a Cash transaction is ``startProtocolDynamic`` which may start an arbitrary protocol, given sufficient permissions to do so. We won't use this function directly, but rather a type-safe wrapper around it ``startProtocol`` that type-checks the arguments for us.
# Start sellers in a loop
for i in {0..9} ; do ./build/install/r3prototyping/bin/trader-demo --role=SELLER ; done
Finally we have everything in place: we start a couple of nodes, connect to them, and start creating transactions while listening on successfully created ones, which are dumped to the console. We just need to run it!:
We should start seeing new ``NODE``-s and ``EDGE``-s appearing.
# Build the example
./gradlew docs/source/example-code:installDist
# Start it
./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial Print
Now let's try to visualise the transaction graph. We will use a graph drawing library called graphstream_
.. literalinclude:: example-code/src/main/kotlin/net.corda.docs/ClientRpcTutorial.kt
.. literalinclude:: example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt
:language: kotlin
:start-after: START 5
:end-before: END 5
If we run the client with ``Visualise`` we should see a simple graph being drawn as new transactions are being created
by the seller runs.
That's it! We saw how to connect to the node and stream data from it.
If we run the client with ``Visualise`` we should see a simple random graph being drawn as new transactions are being created.

View File

@ -1,6 +1,7 @@
apply plugin: 'kotlin'
apply plugin: CanonicalizerPlugin
apply plugin: DefaultPublishTasks
apply plugin: QuasarPlugin
repositories {
mavenLocal()

View File

@ -0,0 +1,168 @@
package net.corda.protocols
import co.paralleluniverse.fibers.Suspendable
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.*
import net.corda.core.crypto.Party
import net.corda.core.crypto.keys
import net.corda.core.crypto.toStringShort
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import java.security.KeyPair
import java.util.*
/**
* Initiates a protocol that produces an Issue/Move or Exit Cash transaction.
*
* @param command Indicates what Cash transaction to create with what parameters.
*/
class CashProtocol(val command: CashCommand): ProtocolLogic<CashProtocolResult>() {
@Suspendable
override fun call(): CashProtocolResult {
return when (command) {
is CashCommand.IssueCash -> issueCash(command)
is CashCommand.PayCash -> initiatePayment(command)
is CashCommand.ExitCash -> exitCash(command)
}
}
// TODO check with the recipient if they want to accept the cash.
@Suspendable
private fun initiatePayment(req: CashCommand.PayCash): CashProtocolResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
// TODO: Have some way of restricting this to states the caller controls
try {
val (spendTX, keysForSigning) = serviceHub.vaultService.generateSpend(builder,
req.amount.withoutIssuer(), req.recipient.owningKey, setOf(req.amount.token.issuer.party))
keysForSigning.keys.forEach {
val key = serviceHub.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}")
builder.signWith(KeyPair(it, key))
}
val tx = spendTX.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req.recipient))
subProtocol(protocol)
return CashProtocolResult.Success(
psm.id,
tx,
"Cash payment transaction generated"
)
} catch(ex: InsufficientBalanceException) {
return CashProtocolResult.Failed(ex.message ?: "Insufficient balance")
}
}
@Suspendable
private fun exitCash(req: CashCommand.ExitCash): CashProtocolResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
try {
val issuer = PartyAndReference(serviceHub.myInfo.legalIdentity, req.issueRef)
Cash().generateExit(builder, req.amount.issuedBy(issuer),
serviceHub.vaultService.currentVault.statesOfType<Cash.State>().filter { it.state.data.owner == issuer.party.owningKey })
val myKey = serviceHub.legalIdentityKey
builder.signWith(myKey)
// Work out who the owners of the burnt states were
val inputStatesNullable = serviceHub.vaultService.statesForRefs(builder.inputStates())
val inputStates = inputStatesNullable.values.filterNotNull().map { it.data }
if (inputStatesNullable.size != inputStates.size) {
val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key }
throw InputStateRefResolveFailed(unresolvedStateRefs)
}
// TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them
// count as a reason to fail?
val participants: Set<Party> = inputStates.filterIsInstance<Cash.State>().map { serviceHub.identityService.partyFromKey(it.owner) }.filterNotNull().toSet()
// Commit the transaction
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
subProtocol(FinalityProtocol(tx, participants))
return CashProtocolResult.Success(
psm.id,
tx,
"Cash destruction transaction generated"
)
} catch (ex: InsufficientBalanceException) {
return CashProtocolResult.Failed(ex.message ?: "Insufficient balance")
}
}
@Suspendable
private fun issueCash(req: CashCommand.IssueCash): CashProtocolResult {
val builder: TransactionBuilder = TransactionType.General.Builder(notary = null)
val issuer = PartyAndReference(serviceHub.myInfo.legalIdentity, req.issueRef)
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
val myKey = serviceHub.legalIdentityKey
builder.signWith(myKey)
val tx = builder.toSignedTransaction(checkSufficientSignatures = true)
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
subProtocol(BroadcastTransactionProtocol(tx, setOf(req.recipient)))
return CashProtocolResult.Success(
psm.id,
tx,
"Cash issuance completed"
)
}
}
/**
* A command to initiate the Cash protocol with.
*/
sealed class CashCommand {
/**
* Issue cash state objects.
*
* @param amount the amount of currency to issue on to the ledger.
* @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is
* to use the single byte "0x01" as a default.
* @param recipient the party to issue the cash to.
* @param notary the notary to use for this transaction.
*/
class IssueCash(val amount: Amount<Currency>,
val issueRef: OpaqueBytes,
val recipient: Party,
val notary: Party) : CashCommand()
/**
* Pay cash to someone else.
*
* @param amount the amount of currency to issue on to the ledger.
* @param recipient the party to issue the cash to.
*/
class PayCash(val amount: Amount<Issued<Currency>>, val recipient: Party) : CashCommand()
/**
* Exit cash from the ledger.
*
* @param amount the amount of currency to exit from the ledger.
* @param issueRef the reference previously specified on the issuance.
*/
class ExitCash(val amount: Amount<Currency>, val issueRef: OpaqueBytes) : CashCommand()
}
sealed class CashProtocolResult {
/**
* @param transaction the transaction created as a result, in the case where the protocol completed successfully.
*/
class Success(val id: StateMachineRunId, val transaction: SignedTransaction?, val message: String?) : CashProtocolResult() {
override fun toString() = "Success($message)"
}
/**
* State indicating the action undertaken failed, either directly (it is not something which requires a
* state machine), or before a state machine was started.
*/
class Failed(val message: String?) : CashProtocolResult() {
override fun toString() = "Failed($message)"
}
}
class InputStateRefResolveFailed(stateRefs: List<StateRef>) :
Exception("Failed to resolve input StateRefs $stateRefs")

View File

@ -546,6 +546,7 @@ class CashTests {
val wtx = makeSpend(100.DOLLARS, THEIR_PUBKEY_1)
@Suppress("UNCHECKED_CAST")
val vaultState = vaultService.states.elementAt(0) as StateAndRef<Cash.State>
assertEquals(vaultState.ref, wtx.inputs[0])
assertEquals(vaultState.state.data.copy(owner = THEIR_PUBKEY_1), wtx.outputs[0].data)
@ -572,6 +573,7 @@ class CashTests {
val wtx = makeSpend(10.DOLLARS, THEIR_PUBKEY_1)
@Suppress("UNCHECKED_CAST")
val vaultState = vaultService.states.elementAt(0) as StateAndRef<Cash.State>
assertEquals(vaultState.ref, wtx.inputs[0])
assertEquals(vaultState.state.data.copy(owner = THEIR_PUBKEY_1, amount = 10.DOLLARS `issued by` defaultIssuer), wtx.outputs[0].data)
@ -586,6 +588,7 @@ class CashTests {
databaseTransaction(database) {
val wtx = makeSpend(500.DOLLARS, THEIR_PUBKEY_1)
@Suppress("UNCHECKED_CAST")
val vaultState0 = vaultService.states.elementAt(0) as StateAndRef<Cash.State>
val vaultState1 = vaultService.states.elementAt(1)
assertEquals(vaultState0.ref, wtx.inputs[0])
@ -602,8 +605,10 @@ class CashTests {
val wtx = makeSpend(580.DOLLARS, THEIR_PUBKEY_1)
assertEquals(3, wtx.inputs.size)
@Suppress("UNCHECKED_CAST")
val vaultState0 = vaultService.states.elementAt(0) as StateAndRef<Cash.State>
val vaultState1 = vaultService.states.elementAt(1)
@Suppress("UNCHECKED_CAST")
val vaultState2 = vaultService.states.elementAt(2) as StateAndRef<Cash.State>
assertEquals(vaultState0.ref, wtx.inputs[0])
assertEquals(vaultState1.ref, wtx.inputs[1])

View File

@ -72,7 +72,7 @@ class APIServerImpl(val node: AbstractNode) : APIServer {
if (type is ProtocolClassRef) {
val protocolLogicRef = node.services.protocolLogicRefFactory.createKotlin(type.className, args)
val protocolInstance = node.services.protocolLogicRefFactory.toProtocolLogic(protocolLogicRef)
return node.services.startProtocol(protocolInstance)
return node.services.startProtocol(protocolInstance).resultFuture
} else {
throw UnsupportedOperationException("Unsupported ProtocolRef type: $type")
}

View File

@ -14,6 +14,7 @@ import net.corda.core.node.services.*
import net.corda.core.node.services.NetworkMapCache.MapChangeType
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -43,6 +44,8 @@ import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.services.vault.CashBalanceAsMetricsObserver
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.*
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import net.corda.protocols.sendRequest
import org.jetbrains.exposed.sql.Database
import org.slf4j.Logger
@ -110,7 +113,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val protocolLogicRefFactory: ProtocolLogicRefFactory get() = protocolLogicFactory
override fun <T> startProtocol(logic: ProtocolLogic<T>): ListenableFuture<T> = smm.add(logic).resultFuture
override fun <T> startProtocol(logic: ProtocolLogic<T>): ProtocolStateMachine<T> = smm.add(logic)
override fun registerProtocolInitiator(markerClass: KClass<*>, protocolFactory: (Party) -> ProtocolLogic<*>) {
require(markerClass !in protocolFactories) { "${markerClass.java.name} has already been used to register a protocol" }
@ -307,8 +310,24 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
}
}
private val defaultProtocolWhiteList: Map<Class<out ProtocolLogic<*>>, Set<Class<*>>> = mapOf(
CashProtocol::class.java to setOf(
CashCommand.IssueCash::class.java,
CashCommand.PayCash::class.java,
CashCommand.ExitCash::class.java
)
)
private fun initialiseProtocolLogicFactory(): ProtocolLogicRefFactory {
val protocolWhitelist = HashMap<String, Set<String>>()
for ((protocolClass, extraArgumentTypes) in defaultProtocolWhiteList) {
val argumentWhitelistClassNames = HashSet(extraArgumentTypes.map { it.name })
protocolClass.constructors.forEach {
it.parameters.mapTo(argumentWhitelistClassNames) { it.type.name }
}
protocolWhitelist.merge(protocolClass.name, argumentWhitelistClassNames, { x, y -> x + y })
}
for (plugin in pluginRegistries) {
for ((className, classWhitelist) in plugin.requiredProtocols) {
protocolWhitelist.merge(className, classWhitelist, { x, y -> x + y })

View File

@ -1,8 +1,7 @@
package net.corda.node.internal
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.*
import net.corda.core.crypto.Party
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.keys
import net.corda.core.crypto.toStringShort
@ -11,16 +10,17 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.toObservable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.messaging.*
import net.corda.node.services.startProtocolPermission
import net.corda.node.services.statemachine.ProtocolStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.databaseTransaction
import net.corda.protocols.BroadcastTransactionProtocol
import net.corda.protocols.FinalityProtocol
import org.jetbrains.exposed.sql.Database
import rx.Observable
import java.security.KeyPair
/**
* Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server
@ -31,10 +31,6 @@ class CordaRPCOpsImpl(
val smm: StateMachineManager,
val database: Database
) : CordaRPCOps {
companion object {
const val CASH_PERMISSION = "CASH"
}
override val protocolVersion: Int get() = 0
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
@ -67,17 +63,6 @@ class CordaRPCOpsImpl(
}
}
override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult {
requirePermission(CASH_PERMISSION)
return databaseTransaction(database) {
when (command) {
is ClientToServiceCommand.IssueCash -> issueCash(command)
is ClientToServiceCommand.PayCash -> initiatePayment(command)
is ClientToServiceCommand.ExitCash -> exitCash(command)
}
}
}
override fun nodeIdentity(): NodeInfo {
return services.myInfo
}
@ -94,83 +79,14 @@ class CordaRPCOpsImpl(
}
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun initiatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
// TODO: Have some way of restricting this to states the caller controls
try {
val (spendTX, keysForSigning) = services.vaultService.generateSpend(builder,
req.amount.withoutIssuer(), req.recipient.owningKey, setOf(req.amount.token.issuer.party))
keysForSigning.keys.forEach {
val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}")
builder.signWith(KeyPair(it, key))
}
val tx = spendTX.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(
smm.add(protocol).id,
tx,
"Cash payment transaction generated"
)
} catch(ex: InsufficientBalanceException) {
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
}
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
try {
val issuer = PartyAndReference(services.myInfo.legalIdentity, req.issueRef)
Cash().generateExit(builder, req.amount.issuedBy(issuer),
services.vaultService.currentVault.statesOfType<Cash.State>().filter { it.state.data.owner == issuer.party.owningKey })
val myKey = services.legalIdentityKey
builder.signWith(myKey)
// Work out who the owners of the burnt states were
val inputStatesNullable = services.vaultService.statesForRefs(builder.inputStates())
val inputStates = inputStatesNullable.values.filterNotNull().map { it.data }
if (inputStatesNullable.size != inputStates.size) {
val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key }
throw InputStateRefResolveFailed(unresolvedStateRefs)
}
// TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them
// count as a reason to fail?
val participants: Set<Party> = inputStates.filterIsInstance<Cash.State>().map { services.identityService.partyFromKey(it.owner) }.filterNotNull().toSet()
// Commit the transaction
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), participants)
return TransactionBuildResult.ProtocolStarted(
smm.add(protocol).id,
tx,
"Cash destruction transaction generated"
)
} catch (ex: InsufficientBalanceException) {
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
}
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(notary = null)
val issuer = PartyAndReference(services.myInfo.legalIdentity, req.issueRef)
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
val myKey = services.legalIdentityKey
builder.signWith(myKey)
val tx = builder.toSignedTransaction(checkSufficientSignatures = true)
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(
smm.add(protocol).id,
tx,
"Cash issuance completed"
// TODO: Check that this protocol is annotated as being intended for RPC invocation
override fun <T: Any> startProtocolDynamic(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ProtocolHandle<T> {
requirePermission(startProtocolPermission(logicType))
val stateMachine = services.invokeProtocolAsync(logicType, *args) as ProtocolStateMachineImpl<T>
return ProtocolHandle(
id = stateMachine.id,
progress = stateMachine.logic.progressTracker?.changes ?: Observable.empty<ProgressTracker.Change>(),
returnValue = stateMachine.resultFuture.toObservable()
)
}
class InputStateRefResolveFailed(stateRefs: List<StateRef>) :
Exception("Failed to resolve input StateRefs $stateRefs")
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services
import com.typesafe.config.Config
import net.corda.core.protocols.ProtocolLogic
import net.corda.node.services.config.getListOrElse
/**
@ -9,7 +10,7 @@ import net.corda.node.services.config.getListOrElse
* to. These permissions are represented as [String]s to allow RPC implementations to add their own permissioning.
*/
interface RPCUserService {
fun getUser(usename: String): User?
fun getUser(username: String): User?
val users: List<User>
}
@ -25,13 +26,13 @@ class RPCUserServiceImpl(config: Config) : RPCUserService {
val username = it.getString("user")
require(username.matches("\\w+".toRegex())) { "Username $username contains invalid characters" }
val password = it.getString("password")
val permissions = it.getListOrElse<String>("permissions") { emptyList() }.map(String::toUpperCase).toSet()
val permissions = it.getListOrElse<String>("permissions") { emptyList() }.toSet()
User(username, password, permissions)
}
.associateBy(User::username)
}
override fun getUser(usename: String): User? = _users[usename]
override fun getUser(username: String): User? = _users[username]
override val users: List<User> get() = _users.values.toList()
}
@ -39,3 +40,6 @@ class RPCUserServiceImpl(config: Config) : RPCUserService {
data class User(val username: String, val password: String, val permissions: Set<String>) {
override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)"
}
fun <P : ProtocolLogic<*>> startProtocolPermission(clazz: Class<P>) = "StartProtocol.${clazz.name}"
inline fun <reified P : ProtocolLogic<*>> startProtocolPermission(): String = startProtocolPermission(P::class.java)

View File

@ -6,6 +6,7 @@ import net.corda.core.node.PluginServiceHub
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.statemachine.ProtocolStateMachineImpl
import org.slf4j.LoggerFactory
@ -67,9 +68,9 @@ abstract class ServiceHubInternal : PluginServiceHub {
* between SMM and the scheduler. That particular problem should also be resolved by the service manager work
* itself, at which point this method would not be needed (by the scheduler).
*/
abstract fun <T> startProtocol(logic: ProtocolLogic<T>): ListenableFuture<T>
abstract fun <T> startProtocol(logic: ProtocolLogic<T>): ProtocolStateMachine<T>
override fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ListenableFuture<T> {
override fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ProtocolStateMachine<T> {
val logicRef = protocolLogicRefFactory.create(logicType, *args)
@Suppress("UNCHECKED_CAST")
val logic = protocolLogicRefFactory.toProtocolLogic(logicRef) as ProtocolLogic<T>

View File

@ -1,6 +1,5 @@
package net.corda.node.services.messaging
import net.corda.core.contracts.ClientToServiceCommand
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
@ -8,8 +7,10 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.statemachine.ProtocolStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AddOrRemove
@ -54,31 +55,9 @@ sealed class StateMachineUpdate(val id: StateMachineRunId) {
}
}
sealed class TransactionBuildResult {
/**
* State indicating that a protocol is managing this request, and that the client should track protocol state machine
* updates for further information. The monitor will separately receive notification of the state machine having been
* added, as it would any other state machine. This response is used solely to enable the monitor to identify
* the state machine (and its progress) as associated with the request.
*
* @param transaction the transaction created as a result, in the case where the protocol has completed.
*/
class ProtocolStarted(val id: StateMachineRunId, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() {
override fun toString() = "Started($message)"
}
/**
* State indicating the action undertaken failed, either directly (it is not something which requires a
* state machine), or before a state machine was started.
*/
class Failed(val message: String?) : TransactionBuildResult() {
override fun toString() = "Failed($message)"
}
}
/**
* RPC operations that the node exposes to clients using the Java client library. These can be called from
* client apps and are implemented by the node in the [ServerRPCOps] class.
* client apps and are implemented by the node in the [CordaRPCOpsImpl] class.
*/
interface CordaRPCOps : RPCOps {
/**
@ -113,15 +92,17 @@ interface CordaRPCOps : RPCOps {
fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>>
/**
* Executes the given command if the user is permissioned to do so, possibly triggering cash creation etc.
* TODO: The signature of this is weird because it's the remains of an old service call, we should have a call for each command instead.
* Start the given protocol with the given arguments, returning an [Observable] with a single observation of the
* result of running the protocol.
*/
fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult
@RPCReturnsObservables
fun <T: Any> startProtocolDynamic(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ProtocolHandle<T>
/**
* Returns Node's identity, assuming this will not change while the node is running.
*/
fun nodeIdentity(): NodeInfo
/*
* Add note(s) to an existing Vault transaction
*/
@ -132,3 +113,50 @@ interface CordaRPCOps : RPCOps {
*/
fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String>
}
/**
* These allow type safe invocations of protocols from Kotlin, e.g.:
*
* val rpc: CordaRPCOps = (..)
* rpc.startProtocol(::ResolveTransactionsProtocol, setOf<SecureHash>(), aliceIdentity)
*
* Note that the passed in constructor function is only used for unification of other type parameters and reification of
* the Class instance of the protocol. This could be changed to use the constructor function directly.
*/
inline fun <T : Any, reified R : ProtocolLogic<T>> CordaRPCOps.startProtocol(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: () -> R
) = startProtocolDynamic(R::class.java)
inline fun <T : Any, A, reified R : ProtocolLogic<T>> CordaRPCOps.startProtocol(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: (A) -> R,
arg0: A
) = startProtocolDynamic(R::class.java, arg0)
inline fun <T : Any, A, B, reified R : ProtocolLogic<T>> CordaRPCOps.startProtocol(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: (A, B) -> R,
arg0: A,
arg1: B
) = startProtocolDynamic(R::class.java, arg0, arg1)
inline fun <T : Any, A, B, C, reified R: ProtocolLogic<T>> CordaRPCOps.startProtocol(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: (A, B, C) -> R,
arg0: A,
arg1: B,
arg2: C
) = startProtocolDynamic(R::class.java, arg0, arg1, arg2)
inline fun <T : Any, A, B, C, D, reified R : ProtocolLogic<T>> CordaRPCOps.startProtocol(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: (A, B, C, D) -> R,
arg0: A,
arg1: B,
arg2: C,
arg3: D
) = startProtocolDynamic(R::class.java, arg0, arg1, arg2, arg3)
data class ProtocolHandle<A>(
val id: StateMachineRunId,
val progress: Observable<ProgressTracker.Change>,
val returnValue: Observable<A>
)

View File

@ -28,6 +28,7 @@ import net.corda.core.serialization.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.services.User
import net.corda.protocols.CashProtocolResult
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.objenesis.strategy.StdInstantiatorStrategy
@ -173,8 +174,8 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(Cash.Clauses.ConserveAmount::class.java)
register(listOf(Unit).javaClass) // SingletonList
register(setOf(Unit).javaClass) // SingletonSet
register(TransactionBuildResult.ProtocolStarted::class.java)
register(TransactionBuildResult.Failed::class.java)
register(CashProtocolResult.Success::class.java)
register(CashProtocolResult.Failed::class.java)
register(ServiceEntry::class.java)
register(NodeInfo::class.java)
register(PhysicalLocation::class.java)
@ -204,6 +205,8 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(RPCException::class.java)
register(Array<StackTraceElement>::class.java, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> })
register(Collections.unmodifiableList(emptyList<String>()).javaClass)
register(PermissionException::class.java)
register(ProtocolHandle::class.java)
}
// Helper method, attempt to reduce boiler plate code

View File

@ -34,7 +34,7 @@ class RPCUserServiceImplTest {
@Test
fun `single permission, which is in lower case`() {
val service = loadWithContents("rpcUsers : [{ user=user1, password=letmein, permissions=[cash] }]")
assertThat(service.getUser("user1")?.permissions).containsOnly("CASH")
assertThat(service.getUser("user1")?.permissions).containsOnly("cash")
}
@Test

View File

@ -12,9 +12,13 @@ import net.corda.node.services.User
import net.corda.node.services.messaging.CURRENT_RPC_USER
import net.corda.node.services.messaging.PermissionException
import net.corda.node.services.messaging.StateMachineUpdate
import net.corda.node.services.messaging.startProtocol
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.startProtocolPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.utilities.databaseTransaction
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.node.MockNetwork
@ -44,7 +48,7 @@ class CordaRPCOpsImplTest {
aliceNode = network.createNode(networkMapAddress = networkMap.info.address)
notaryNode = network.createNode(advertisedServices = ServiceInfo(SimpleNotaryService.type), networkMapAddress = networkMap.info.address)
rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database)
CURRENT_RPC_USER.set(User("user", "pwd", permissions = setOf(CordaRPCOpsImpl.CASH_PERMISSION)))
CURRENT_RPC_USER.set(User("user", "pwd", permissions = setOf(startProtocolPermission<CashProtocol>())))
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
transactions = rpc.verifiedTransactions().second
@ -63,8 +67,8 @@ class CordaRPCOpsImplTest {
// Tell the monitoring service node to issue some cash
val recipient = aliceNode.info.legalIdentity
val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, notaryNode.info.notaryIdentity)
rpc.executeCommand(outEvent)
val outEvent = CashCommand.IssueCash(Amount(quantity, GBP), ref, recipient, notaryNode.info.notaryIdentity)
rpc.startProtocol(::CashProtocol, outEvent)
network.runNetwork()
val expectedState = Cash.State(Amount(quantity,
@ -101,7 +105,7 @@ class CordaRPCOpsImplTest {
@Test
fun `issue and move`() {
rpc.executeCommand(ClientToServiceCommand.IssueCash(
rpc.startProtocol(::CashProtocol, CashCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.info.legalIdentity,
@ -110,7 +114,7 @@ class CordaRPCOpsImplTest {
network.runNetwork()
rpc.executeCommand(ClientToServiceCommand.PayCash(
rpc.startProtocol(::CashProtocol, CashCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.info.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.info.legalIdentity
))
@ -182,7 +186,7 @@ class CordaRPCOpsImplTest {
fun `cash command by user not permissioned for cash`() {
CURRENT_RPC_USER.set(User("user", "pwd", permissions = emptySet()))
assertThatExceptionOfType(PermissionException::class.java).isThrownBy {
rpc.executeCommand(ClientToServiceCommand.IssueCash(
rpc.startProtocol(::CashProtocol, CashCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.info.legalIdentity,

View File

@ -53,7 +53,7 @@ class AttachmentTests {
network.runNetwork()
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity))
network.runNetwork()
assertEquals(0, f1.get().fromDisk.size)
assertEquals(0, f1.resultFuture.get().fromDisk.size)
// Verify it was inserted into node one's store.
val attachment = n1.storage.attachments.openAttachment(id)!!
@ -62,7 +62,7 @@ class AttachmentTests {
// Shut down node zero and ensure node one can still resolve the attachment.
n0.stop()
val response: FetchDataProtocol.Result<Attachment> = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity)).get()
val response: FetchDataProtocol.Result<Attachment> = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity)).resultFuture.get()
assertEquals(attachment, response.fromDisk[0])
}
@ -75,7 +75,7 @@ class AttachmentTests {
network.runNetwork()
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(hash), n0.info.legalIdentity))
network.runNetwork()
val e = assertFailsWith<FetchDataProtocol.HashNotFound> { rootCauseExceptions { f1.get() } }
val e = assertFailsWith<FetchDataProtocol.HashNotFound> { rootCauseExceptions { f1.resultFuture.get() } }
assertEquals(hash, e.requested)
}
@ -107,7 +107,7 @@ class AttachmentTests {
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity))
network.runNetwork()
assertFailsWith<FetchDataProtocol.DownloadedVsRequestedDataMismatch> {
rootCauseExceptions { f1.get() }
rootCauseExceptions { f1.resultFuture.get() }
}
}
}

View File

@ -1,12 +1,12 @@
package net.corda.node.services
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.crypto.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.*
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.transactions.SignedTransaction
import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.MessagingServiceInternal
@ -79,7 +79,7 @@ open class MockServiceHubInternal(
override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
override fun <T> startProtocol(logic: ProtocolLogic<T>): ListenableFuture<T> = smm.add(logic).resultFuture
override fun <T> startProtocol(logic: ProtocolLogic<T>): ProtocolStateMachine<T> = smm.add(logic)
override fun registerProtocolInitiator(markerClass: KClass<*>, protocolFactory: (Party) -> ProtocolLogic<*>) {
protocolFactories[markerClass.java] = protocolFactory

View File

@ -53,7 +53,7 @@ class NotaryChangeTests {
net.runNetwork()
val newState = future.get()
val newState = future.resultFuture.get()
assertEquals(newState.state.notary, newNotary)
}
@ -66,7 +66,7 @@ class NotaryChangeTests {
net.runNetwork()
val newState = future.get()
val newState = future.resultFuture.get()
assertEquals(newState.state.notary, newNotary)
val loadedStateA = clientNodeA.services.loadState(newState.ref)
val loadedStateB = clientNodeB.services.loadState(newState.ref)
@ -82,7 +82,7 @@ class NotaryChangeTests {
net.runNetwork()
val ex = assertFailsWith(ExecutionException::class) { future.get() }
val ex = assertFailsWith(ExecutionException::class) { future.resultFuture.get() }
val error = (ex.cause as StateReplacementException).error
assertTrue(error is StateReplacementRefused)
}

View File

@ -101,7 +101,7 @@ class NotaryServiceTests {
net.runNetwork()
val ex = assertFailsWith(ExecutionException::class) { future.get() }
val ex = assertFailsWith(ExecutionException::class) { future.resultFuture.get() }
val notaryError = (ex.cause as NotaryException).error as NotaryError.Conflict
assertEquals(notaryError.tx, stx.tx)
notaryError.conflict.verified()
@ -110,7 +110,7 @@ class NotaryServiceTests {
private fun runNotaryClient(stx: SignedTransaction): ListenableFuture<DigitalSignature.WithKey> {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.services.startProtocol(protocol)
val future = clientNode.services.startProtocol(protocol).resultFuture
net.runNetwork()
return future
}

View File

@ -80,7 +80,7 @@ class ValidatingNotaryServiceTests {
private fun runClient(stx: SignedTransaction): ListenableFuture<DigitalSignature.WithKey> {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.services.startProtocol(protocol)
val future = clientNode.services.startProtocol(protocol).resultFuture
net.runNetwork()
return future
}

View File

@ -96,7 +96,7 @@ class DataVendingServiceTests {
private class NotifyTxProtocol(val otherParty: Party, val stx: SignedTransaction) : ProtocolLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, NotifyTxRequest(stx, emptySet()))
override fun call() = send(otherParty, NotifyTxRequest(stx))
}
}

View File

@ -1,6 +1,5 @@
package net.corda.testing.node
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.*
import net.corda.core.messaging.MessagingService
@ -9,6 +8,7 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.*
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
@ -38,7 +38,7 @@ import javax.annotation.concurrent.ThreadSafe
* building chains of transactions and verifying them. It isn't sufficient for testing protocols however.
*/
open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub {
override fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ListenableFuture<T> {
override fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ProtocolStateMachine<T> {
throw UnsupportedOperationException("not implemented")
}

View File

@ -1,5 +1,6 @@
package net.corda.explorer
import javafx.stage.Stage
import net.corda.client.mock.EventGenerator
import net.corda.client.model.Models
import net.corda.client.model.NodeMonitorModel
@ -7,12 +8,13 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.explorer.views.runInFxApplicationThread
import net.corda.node.driver.PortAllocation
import net.corda.node.driver.driver
import net.corda.node.internal.CordaRPCOpsImpl
import net.corda.node.services.User
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.startProtocol
import net.corda.node.services.startProtocolPermission
import net.corda.node.services.transactions.SimpleNotaryService
import javafx.stage.Stage
import net.corda.protocols.CashProtocol
import org.controlsfx.dialog.ExceptionDialog
import tornadofx.App
import java.util.*
@ -43,7 +45,7 @@ class Main : App() {
fun main(args: Array<String>) {
val portAllocation = PortAllocation.Incremental(20000)
driver(portAllocation = portAllocation) {
val user = User("user1", "test", permissions = setOf(CordaRPCOpsImpl.CASH_PERMISSION))
val user = User("user1", "test", permissions = setOf(startProtocolPermission<CashProtocol>()))
val notary = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
val alice = startNode("Alice", rpcUsers = arrayListOf(user))
val bob = startNode("Bob", rpcUsers = arrayListOf(user))
@ -67,7 +69,7 @@ fun main(args: Array<String>) {
notary = notaryNode.nodeInfo.notaryIdentity
)
eventGenerator.clientToServiceCommandGenerator.map { command ->
rpcProxy?.executeCommand(command)
rpcProxy?.startProtocol(::CashProtocol, command)
}.generate(Random())
}
waitForAllNodesToFinish()

View File

@ -1,5 +1,14 @@
package net.corda.explorer.views
import javafx.beans.binding.Bindings
import javafx.beans.binding.BooleanBinding
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import javafx.scene.Node
import javafx.scene.Parent
import javafx.scene.control.*
import javafx.util.converter.BigDecimalStringConverter
import net.corda.client.fxutils.map
import net.corda.client.model.NetworkIdentityModel
import net.corda.client.model.NodeMonitorModel
@ -10,16 +19,10 @@ import net.corda.core.node.NodeInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.explorer.model.CashTransaction
import net.corda.node.services.messaging.CordaRPCOps
import net.corda.node.services.messaging.TransactionBuildResult
import javafx.beans.binding.Bindings
import javafx.beans.binding.BooleanBinding
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import javafx.scene.Node
import javafx.scene.Parent
import javafx.scene.control.*
import javafx.util.converter.BigDecimalStringConverter
import net.corda.node.services.messaging.startProtocol
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import net.corda.protocols.CashProtocolResult
import org.controlsfx.dialog.ExceptionDialog
import tornadofx.View
import java.math.BigDecimal
@ -126,9 +129,9 @@ class NewTransaction : View() {
val issueRef = OpaqueBytes(if (issueRefTextField.text.trim().isNotBlank()) issueRefTextField.text.toByteArray() else ByteArray(1, { 1 }))
// TODO : Change these commands into individual RPC methods instead of using executeCommand.
val command = when (it) {
CashTransaction.Issue -> ClientToServiceCommand.IssueCash(Amount(textFormatter.value, currency.value), issueRef, partyBChoiceBox.value.legalIdentity, notary.notaryIdentity)
CashTransaction.Pay -> ClientToServiceCommand.PayCash(Amount(textFormatter.value, Issued(PartyAndReference(myIdentity.legalIdentity, issueRef), currency.value)), partyBChoiceBox.value.legalIdentity)
CashTransaction.Exit -> ClientToServiceCommand.ExitCash(Amount(textFormatter.value, currency.value), issueRef)
CashTransaction.Issue -> CashCommand.IssueCash(Amount(textFormatter.value, currency.value), issueRef, partyBChoiceBox.value.legalIdentity, notary.notaryIdentity)
CashTransaction.Pay -> CashCommand.PayCash(Amount(textFormatter.value, Issued(PartyAndReference(myIdentity.legalIdentity, issueRef), currency.value)), partyBChoiceBox.value.legalIdentity)
CashTransaction.Exit -> CashCommand.ExitCash(Amount(textFormatter.value, currency.value), issueRef)
}
val dialog = Alert(Alert.AlertType.INFORMATION).apply {
headerText = null
@ -138,15 +141,15 @@ class NewTransaction : View() {
}
dialog.show()
runAsync {
rpcProxy.executeCommand(command)
rpcProxy.startProtocol(::CashProtocol, command).returnValue.toBlocking().first()
}.ui {
dialog.contentText = when (it) {
is TransactionBuildResult.ProtocolStarted -> {
is CashProtocolResult.Success -> {
dialog.alertType = Alert.AlertType.INFORMATION
dialog.setOnCloseRequest { resetScreen() }
"Transaction Started \nTransaction ID : ${it.transaction?.id} \nMessage : ${it.message}"
}
is TransactionBuildResult.Failed -> {
is CashProtocolResult.Failed -> {
dialog.alertType = Alert.AlertType.ERROR
it.toString()
}