mirror of
https://github.com/corda/corda.git
synced 2025-06-01 23:20:54 +00:00
Start cleaning up node API Dokka comments (#1397)
* Change RPCApi comments to inline from Dokka, as the existing comments are not API documentation, and render simply as "The RPC protocol:" in several places, which is unhelpful. * Rewrite RPCApi comments to no longer be in semi-Dokka format, but instead better reflect they are functional documentation. * Add Dokka documentation around RPC API * Change JvmStatics to const
This commit is contained in:
parent
8537adaa96
commit
0df93e6e50
@ -4,14 +4,6 @@ import net.corda.core.serialization.SerializationContext
|
|||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.Try
|
import net.corda.core.utilities.Try
|
||||||
import net.corda.nodeapi.RPCApi.ClientToServer
|
|
||||||
import net.corda.nodeapi.RPCApi.ObservableId
|
|
||||||
import net.corda.nodeapi.RPCApi.RPC_CLIENT_BINDING_REMOVALS
|
|
||||||
import net.corda.nodeapi.RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION
|
|
||||||
import net.corda.nodeapi.RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX
|
|
||||||
import net.corda.nodeapi.RPCApi.RPC_SERVER_QUEUE_NAME
|
|
||||||
import net.corda.nodeapi.RPCApi.RpcRequestId
|
|
||||||
import net.corda.nodeapi.RPCApi.ServerToClient
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.*
|
import org.apache.activemq.artemis.api.core.client.*
|
||||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
||||||
@ -20,41 +12,43 @@ import org.apache.activemq.artemis.reader.MessageUtil
|
|||||||
import rx.Notification
|
import rx.Notification
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
|
// The RPC protocol:
|
||||||
|
//
|
||||||
|
// The server consumes the queue "RPC_SERVER_QUEUE_NAME" and receives RPC requests (ClientToServer.RpcRequest) on it.
|
||||||
|
// When a client starts up it should create a queue for its inbound messages, this should be of the form
|
||||||
|
// "RPC_CLIENT_QUEUE_NAME_PREFIX.$username.$nonce". Each RPC request contains this address (in
|
||||||
|
// ClientToServer.RpcRequest.clientAddress), this is where the server will send the reply to the request as well as
|
||||||
|
// subsequent Observations rooted in the RPC. The requests/replies are muxed using a unique RpcRequestId generated by
|
||||||
|
// the client for each request.
|
||||||
|
//
|
||||||
|
// If an RPC reply's payload (ServerToClient.RpcReply.result) contains observables then the server will generate a
|
||||||
|
// unique ObservableId for each and serialise them in place of the observables themselves. Subsequently the client
|
||||||
|
// should be prepared to receive observations (ServerToClient.Observation), muxed by the relevant ObservableId.
|
||||||
|
// In addition each observation itself may contain further observables, this case should behave the same as before.
|
||||||
|
//
|
||||||
|
// Additionally the client may send ClientToServer.ObservablesClosed messages indicating that certain observables
|
||||||
|
// aren't consumed anymore, which should subsequently stop the stream from the server. Note that some observations may
|
||||||
|
// already be in flight when this is sent, the client should handle this gracefully.
|
||||||
|
//
|
||||||
|
// An example session:
|
||||||
|
// Client Server
|
||||||
|
// ----------RpcRequest(RID0)-----------> // Client makes RPC request with ID "RID0"
|
||||||
|
// <----RpcReply(RID0, Payload(OID0))---- // Server sends reply containing an observable with ID "OID0"
|
||||||
|
// <---------Observation(OID0)----------- // Server sends observation onto "OID0"
|
||||||
|
// <---Observation(OID0, Payload(OID1))-- // Server sends another observation, this time containing another observable
|
||||||
|
// <---------Observation(OID1)----------- // Observation onto new "OID1"
|
||||||
|
// <---------Observation(OID0)-----------
|
||||||
|
// -----ObservablesClosed(OID0, OID1)---> // Client indicates it stopped consuming the observables.
|
||||||
|
// <---------Observation(OID1)----------- // Observation was already in-flight before the previous message was processed
|
||||||
|
// (FIN)
|
||||||
|
//
|
||||||
|
// Note that multiple sessions like the above may interleave in an arbitrary fashion.
|
||||||
|
//
|
||||||
|
// Additionally the server may listen on client binding removals for cleanup using RPC_CLIENT_BINDING_REMOVALS. This
|
||||||
|
// requires the server to create a filter on the Artemis notification address using RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The RPC protocol:
|
* Constants and data types used by the RPC API.
|
||||||
*
|
|
||||||
* The server consumes the queue "[RPC_SERVER_QUEUE_NAME]" and receives RPC requests ([ClientToServer.RpcRequest]) on it.
|
|
||||||
* When a client starts up it should create a queue for its inbound messages, this should be of the form
|
|
||||||
* "[RPC_CLIENT_QUEUE_NAME_PREFIX].$username.$nonce". Each RPC request contains this address (in
|
|
||||||
* [ClientToServer.RpcRequest.clientAddress]), this is where the server will send the reply to the request as well as
|
|
||||||
* subsequent Observations rooted in the RPC. The requests/replies are muxed using a unique [RpcRequestId] generated by
|
|
||||||
* the client for each request.
|
|
||||||
*
|
|
||||||
* If an RPC reply's payload ([ServerToClient.RpcReply.result]) contains [Observable]s then the server will generate a
|
|
||||||
* unique [ObservableId] for each and serialise them in place of the [Observable]s themselves. Subsequently the client
|
|
||||||
* should be prepared to receive observations ([ServerToClient.Observation]), muxed by the relevant [ObservableId].
|
|
||||||
* In addition each observation itself may contain further [Observable]s, this case should behave the same as before.
|
|
||||||
*
|
|
||||||
* Additionally the client may send [ClientToServer.ObservablesClosed] messages indicating that certain observables
|
|
||||||
* aren't consumed anymore, which should subsequently stop the stream from the server. Note that some observations may
|
|
||||||
* already be in flight when this is sent, the client should handle this gracefully.
|
|
||||||
*
|
|
||||||
* An example session:
|
|
||||||
* Client Server
|
|
||||||
* ----------RpcRequest(RID0)-----------> // Client makes RPC request with ID "RID0"
|
|
||||||
* <----RpcReply(RID0, Payload(OID0))---- // Server sends reply containing an observable with ID "OID0"
|
|
||||||
* <---------Observation(OID0)----------- // Server sends observation onto "OID0"
|
|
||||||
* <---Observation(OID0, Payload(OID1))-- // Server sends another observation, this time containing another observable
|
|
||||||
* <---------Observation(OID1)----------- // Observation onto new "OID1"
|
|
||||||
* <---------Observation(OID0)-----------
|
|
||||||
* -----ObservablesClosed(OID0, OID1)---> // Client indicates it stopped consuming the Observables.
|
|
||||||
* <---------Observation(OID1)----------- // Observation was already in-flight before the previous message was processed
|
|
||||||
* (FIN)
|
|
||||||
*
|
|
||||||
* Note that multiple sessions like the above may interleave in an arbitrary fashion.
|
|
||||||
*
|
|
||||||
* Additionally the server may listen on client binding removals for cleanup using [RPC_CLIENT_BINDING_REMOVALS]. This
|
|
||||||
* requires the server to create a filter on the artemis notification address using [RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION]
|
|
||||||
*/
|
*/
|
||||||
object RPCApi {
|
object RPCApi {
|
||||||
private val TAG_FIELD_NAME = "tag"
|
private val TAG_FIELD_NAME = "tag"
|
||||||
@ -62,10 +56,15 @@ object RPCApi {
|
|||||||
private val OBSERVABLE_ID_FIELD_NAME = "observable-id"
|
private val OBSERVABLE_ID_FIELD_NAME = "observable-id"
|
||||||
private val METHOD_NAME_FIELD_NAME = "method-name"
|
private val METHOD_NAME_FIELD_NAME = "method-name"
|
||||||
|
|
||||||
val RPC_SERVER_QUEUE_NAME = "rpc.server"
|
/** Name of the Artemis queue on which the server receives RPC requests (as [ClientToServer.RpcRequest]). */
|
||||||
val RPC_CLIENT_QUEUE_NAME_PREFIX = "rpc.client"
|
const val RPC_SERVER_QUEUE_NAME = "rpc.server"
|
||||||
val RPC_CLIENT_BINDING_REMOVALS = "rpc.clientqueueremovals"
|
/**
|
||||||
val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions"
|
* Prefix to Artemis queue names used by clients to receive communication back from a server. The full queue name
|
||||||
|
* should be of the form "rpc.client.<username>.<nonce>".
|
||||||
|
*/
|
||||||
|
const val RPC_CLIENT_QUEUE_NAME_PREFIX = "rpc.client"
|
||||||
|
const val RPC_CLIENT_BINDING_REMOVALS = "rpc.clientqueueremovals"
|
||||||
|
const val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions"
|
||||||
|
|
||||||
val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION =
|
val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION =
|
||||||
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " +
|
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " +
|
||||||
@ -83,12 +82,23 @@ object RPCApi {
|
|||||||
return ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }
|
return ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Message content types which can be sent from a Corda client to a server.
|
||||||
|
*/
|
||||||
sealed class ClientToServer {
|
sealed class ClientToServer {
|
||||||
private enum class Tag {
|
private enum class Tag {
|
||||||
RPC_REQUEST,
|
RPC_REQUEST,
|
||||||
OBSERVABLES_CLOSED
|
OBSERVABLES_CLOSED
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request to a server to trigger the specified method with the provided arguments.
|
||||||
|
*
|
||||||
|
* @param clientAddress return address to contact the client at.
|
||||||
|
* @param id a unique ID for the request, which the server will use to identify its response with.
|
||||||
|
* @param methodName name of the method (procedure) to be called.
|
||||||
|
* @param arguments arguments to pass to the method, if any.
|
||||||
|
*/
|
||||||
data class RpcRequest(
|
data class RpcRequest(
|
||||||
val clientAddress: SimpleString,
|
val clientAddress: SimpleString,
|
||||||
val id: RpcRequestId,
|
val id: RpcRequestId,
|
||||||
@ -141,6 +151,9 @@ object RPCApi {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Message content types which can be sent from a Corda server back to a client.
|
||||||
|
*/
|
||||||
sealed class ServerToClient {
|
sealed class ServerToClient {
|
||||||
private enum class Tag {
|
private enum class Tag {
|
||||||
RPC_REPLY,
|
RPC_REPLY,
|
||||||
@ -149,6 +162,7 @@ object RPCApi {
|
|||||||
|
|
||||||
abstract fun writeToClientMessage(context: SerializationContext, message: ClientMessage)
|
abstract fun writeToClientMessage(context: SerializationContext, message: ClientMessage)
|
||||||
|
|
||||||
|
/** Reply in response to an [ClientToServer.RpcRequest]. */
|
||||||
data class RpcReply(
|
data class RpcReply(
|
||||||
val id: RpcRequestId,
|
val id: RpcRequestId,
|
||||||
val result: Try<Any?>
|
val result: Try<Any?>
|
||||||
|
@ -41,10 +41,17 @@ open class RPCException(message: String?, cause: Throwable?) : CordaRuntimeExcep
|
|||||||
constructor(msg: String) : this(msg, null)
|
constructor(msg: String) : this(msg, null)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown to indicate that the calling user does not have permission for something they have requested (for example
|
||||||
|
* calling a method).
|
||||||
|
*/
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
class PermissionException(msg: String) : RuntimeException(msg)
|
class PermissionException(msg: String) : RuntimeException(msg)
|
||||||
|
|
||||||
// The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly.
|
/**
|
||||||
|
* The Kryo used for the RPC wire protocol.
|
||||||
|
*/
|
||||||
|
// Every type in the wire protocol is listed here explicitly.
|
||||||
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
|
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
|
||||||
// because we can see everything we're using in one place.
|
// because we can see everything we're using in one place.
|
||||||
class RPCKryo(observableSerializer: Serializer<Observable<*>>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) {
|
class RPCKryo(observableSerializer: Serializer<Observable<*>>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user