Client RPC API Tutorial

In this tutorial we will build a simple command line utility that connects to a node, creates some Cash transactions and meanwhile dumps the transaction graph to the standard output. We will then put some simple visualisation on top. For an explanation on how the RPC works see Client RPC.

We start off by connecting to the node itself. For the purposes of the tutorial we will use the Driver to start up a notary and a node that issues/exits and moves Cash around for herself. To authenticate we will use the certificates of the nodes directly.

Note how we configure the node to create a user that has permission to start the CashFlow.

enum class PrintOrVisualise {
    Print,
    Visualise
}

fun main(args: Array<String>) {
    if (args.size < 1) {
        throw IllegalArgumentException("Usage: <binary> [Print|Visualise]")
    }
    val printOrVisualise = PrintOrVisualise.valueOf(args[0])

    val baseDirectory = Paths.get("build/rpc-api-tutorial")
    val user = User("user", "password", permissions = setOf(startFlowPermission<CashFlow>()))

    driver(driverDirectory = baseDirectory) {
        startNode("Notary", advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type)))
        val node = startNode("Alice", rpcUsers = listOf(user)).get()
        val sslConfig = object : NodeSSLConfiguration {
            override val certificatesPath = baseDirectory / "Alice" / "certificates"
            override val keyStorePassword = "cordacadevpass"
            override val trustStorePassword = "trustpass"
        }

Now we can connect to the node itself using a valid RPC login. We login using the configured user.

        val client = CordaRPCClient(FullNodeConfiguration(node.config).artemisAddress, sslConfig)
        client.start("user", "password")
        val proxy = client.proxy()

        thread {
            generateTransactions(proxy)
        }

We start generating transactions in a different thread (generateTransactions to be defined later) using proxy, which exposes the full RPC interface of the node:

    /**
     * Returns a pair of currently in-progress state machine infos and an observable of future state machine adds/removes.
     */
    @RPCReturnsObservables
    fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>>

    /**
     * Returns a pair of head states in the vault and an observable of future updates to the vault.
     */
    @RPCReturnsObservables
    fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>>

    /**
     * Returns a pair of all recorded transactions and an observable of future recorded ones.
     */
    @RPCReturnsObservables
    fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>>

    /**
     * Returns a snapshot list of existing state machine id - recorded transaction hash mappings, and a stream of future
     * such mappings as well.
     */
    @RPCReturnsObservables
    fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>>

    /**
     * Returns all parties currently visible on the network with their advertised services and an observable of future updates to the network.
     */
    @RPCReturnsObservables
    fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>>

    /**
     * Start the given flow with the given arguments, returning an [Observable] with a single observation of the
     * result of running the flow.
     */
    @RPCReturnsObservables
    fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T>

    /**
     * Returns Node's identity, assuming this will not change while the node is running.
     */
    fun nodeIdentity(): NodeInfo

    /*
     * Add note(s) to an existing Vault transaction
     */
    fun addVaultTransactionNote(txnId: SecureHash, txnNote: String)

    /*
     * Retrieve existing note(s) for a given Vault transaction
     */
    fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String>

The one we need in order to dump the transaction graph is verifiedTransactions. The type signature tells us that the RPC will return a list of transactions and an Observable stream. This is a general pattern, we query some data and the node will return the current snapshot and future updates done to it.

        val (transactions: List<SignedTransaction>, futureTransactions: Observable<SignedTransaction>) = proxy.verifiedTransactions()

The graph will be defined by nodes and edges between them. Each node represents a transaction and edges represent output-input relations. For now let’s just print NODE <txhash> for the former and EDGE <txhash> <txhash> for the latter.

        when (printOrVisualise) {
            PrintOrVisualise.Print -> {
                futureTransactions.startWith(transactions).subscribe { transaction ->
                    println("NODE ${transaction.id}")
                    transaction.tx.inputs.forEach { input ->
                        println("EDGE ${input.txhash} ${transaction.id}")
                    }
                }
            }

Now we just need to create the transactions themselves!

fun generateTransactions(proxy: CordaRPCOps) {
    var ownedQuantity = proxy.vaultAndUpdates().first.fold(0L) { sum, state ->
        sum + (state.state.data as Cash.State).amount.quantity
    }
    val issueRef = OpaqueBytes.of(0)
    val notary = proxy.networkMapUpdates().first.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
    val me = proxy.nodeIdentity().legalIdentity
    val meAndRef = PartyAndReference(me, issueRef)
    while (true) {
        Thread.sleep(1000)
        val random = SplittableRandom()
        val n = random.nextDouble()
        if (ownedQuantity > 10000 && n > 0.8) {
            val quantity = Math.abs(random.nextLong()) % 2000
            proxy.startFlow(::CashFlow, CashCommand.ExitCash(Amount(quantity, USD), issueRef))
            ownedQuantity -= quantity
        } else if (ownedQuantity > 1000 && n < 0.7) {
            val quantity = Math.abs(random.nextLong() % Math.min(ownedQuantity, 2000))
            proxy.startFlow(::CashFlow, CashCommand.PayCash(Amount(quantity, Issued(meAndRef, USD)), me))
        } else {
            val quantity = Math.abs(random.nextLong() % 1000)
            proxy.startFlow(::CashFlow, CashCommand.IssueCash(Amount(quantity, USD), issueRef, me, notary))
            ownedQuantity += quantity
        }
    }
}

We utilise several RPC functions here to query things like the notaries in the node cluster or our own vault.

Then in a loop we generate randomly either an Issue, a Pay or an Exit transaction.

The RPC we need to initiate a Cash transaction is startFlowDynamic which may start an arbitrary flow, given sufficient permissions to do so. We won’t use this function directly, but rather a type-safe wrapper around it startFlow that type-checks the arguments for us.

Finally we have everything in place: we start a couple of nodes, connect to them, and start creating transactions while listening on successfully created ones, which are dumped to the console. We just need to run it!:

# Build the example ./gradlew docs/source/example-code:installDist # Start it ./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial Print

Now let’s try to visualise the transaction graph. We will use a graph drawing library called graphstream

            PrintOrVisualise.Visualise -> {
                val graph = MultiGraph("transactions")
                transactions.forEach { transaction ->
                    graph.addNode<Node>("${transaction.id}")
                }
                transactions.forEach { transaction ->
                    transaction.tx.inputs.forEach { ref ->
                        graph.addEdge<Edge>("$ref", "${ref.txhash}", "${transaction.id}")
                    }
                }
                futureTransactions.subscribe { transaction ->
                    graph.addNode<Node>("${transaction.id}")
                    transaction.tx.inputs.forEach { ref ->
                        graph.addEdge<Edge>("$ref", "${ref.txhash}", "${transaction.id}")
                    }
                }
                graph.display()
            }
        }
        waitForAllNodesToFinish()
    }

}

If we run the client with Visualise we should see a simple random graph being drawn as new transactions are being created.

Registering classes from your Cordapp with RPC Kryo

As described in Client RPC, you currently have to register any additional classes you add that are needed in RPC requests or responses with the Kryo instance RPC uses. Here’s an example of how you do this for an example class.

data class ExampleRPCValue(val foo: String)

class ExampleRPCCordaPluginRegistry : CordaPluginRegistry() {
    override fun registerRPCKryoTypes(kryo: Kryo): Boolean {
        // Add classes like this.
        kryo.register(ExampleRPCValue::class.java)
        // You should return true, otherwise your plugin will be ignored for registering classes with Kryo.
        return true
    }
}

See more on plugins in Creating a Cordapp.

Warning

We will be replacing the use of Kryo in RPC with a stable message format and this will mean that this plugin customisation point will either go away completely or change.