diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt index e3376e482c..408276fded 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt @@ -4,14 +4,6 @@ import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.Try -import net.corda.nodeapi.RPCApi.ClientToServer -import net.corda.nodeapi.RPCApi.ObservableId -import net.corda.nodeapi.RPCApi.RPC_CLIENT_BINDING_REMOVALS -import net.corda.nodeapi.RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION -import net.corda.nodeapi.RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX -import net.corda.nodeapi.RPCApi.RPC_SERVER_QUEUE_NAME -import net.corda.nodeapi.RPCApi.RpcRequestId -import net.corda.nodeapi.RPCApi.ServerToClient import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.management.CoreNotificationType @@ -20,41 +12,43 @@ import org.apache.activemq.artemis.reader.MessageUtil import rx.Notification import java.util.* +// The RPC protocol: +// +// The server consumes the queue "RPC_SERVER_QUEUE_NAME" and receives RPC requests (ClientToServer.RpcRequest) on it. +// When a client starts up it should create a queue for its inbound messages, this should be of the form +// "RPC_CLIENT_QUEUE_NAME_PREFIX.$username.$nonce". Each RPC request contains this address (in +// ClientToServer.RpcRequest.clientAddress), this is where the server will send the reply to the request as well as +// subsequent Observations rooted in the RPC. The requests/replies are muxed using a unique RpcRequestId generated by +// the client for each request. +// +// If an RPC reply's payload (ServerToClient.RpcReply.result) contains observables then the server will generate a +// unique ObservableId for each and serialise them in place of the observables themselves. Subsequently the client +// should be prepared to receive observations (ServerToClient.Observation), muxed by the relevant ObservableId. +// In addition each observation itself may contain further observables, this case should behave the same as before. +// +// Additionally the client may send ClientToServer.ObservablesClosed messages indicating that certain observables +// aren't consumed anymore, which should subsequently stop the stream from the server. Note that some observations may +// already be in flight when this is sent, the client should handle this gracefully. +// +// An example session: +// Client Server +// ----------RpcRequest(RID0)-----------> // Client makes RPC request with ID "RID0" +// <----RpcReply(RID0, Payload(OID0))---- // Server sends reply containing an observable with ID "OID0" +// <---------Observation(OID0)----------- // Server sends observation onto "OID0" +// <---Observation(OID0, Payload(OID1))-- // Server sends another observation, this time containing another observable +// <---------Observation(OID1)----------- // Observation onto new "OID1" +// <---------Observation(OID0)----------- +// -----ObservablesClosed(OID0, OID1)---> // Client indicates it stopped consuming the observables. +// <---------Observation(OID1)----------- // Observation was already in-flight before the previous message was processed +// (FIN) +// +// Note that multiple sessions like the above may interleave in an arbitrary fashion. +// +// Additionally the server may listen on client binding removals for cleanup using RPC_CLIENT_BINDING_REMOVALS. This +// requires the server to create a filter on the Artemis notification address using RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION + /** - * The RPC protocol: - * - * The server consumes the queue "[RPC_SERVER_QUEUE_NAME]" and receives RPC requests ([ClientToServer.RpcRequest]) on it. - * When a client starts up it should create a queue for its inbound messages, this should be of the form - * "[RPC_CLIENT_QUEUE_NAME_PREFIX].$username.$nonce". Each RPC request contains this address (in - * [ClientToServer.RpcRequest.clientAddress]), this is where the server will send the reply to the request as well as - * subsequent Observations rooted in the RPC. The requests/replies are muxed using a unique [RpcRequestId] generated by - * the client for each request. - * - * If an RPC reply's payload ([ServerToClient.RpcReply.result]) contains [Observable]s then the server will generate a - * unique [ObservableId] for each and serialise them in place of the [Observable]s themselves. Subsequently the client - * should be prepared to receive observations ([ServerToClient.Observation]), muxed by the relevant [ObservableId]. - * In addition each observation itself may contain further [Observable]s, this case should behave the same as before. - * - * Additionally the client may send [ClientToServer.ObservablesClosed] messages indicating that certain observables - * aren't consumed anymore, which should subsequently stop the stream from the server. Note that some observations may - * already be in flight when this is sent, the client should handle this gracefully. - * - * An example session: - * Client Server - * ----------RpcRequest(RID0)-----------> // Client makes RPC request with ID "RID0" - * <----RpcReply(RID0, Payload(OID0))---- // Server sends reply containing an observable with ID "OID0" - * <---------Observation(OID0)----------- // Server sends observation onto "OID0" - * <---Observation(OID0, Payload(OID1))-- // Server sends another observation, this time containing another observable - * <---------Observation(OID1)----------- // Observation onto new "OID1" - * <---------Observation(OID0)----------- - * -----ObservablesClosed(OID0, OID1)---> // Client indicates it stopped consuming the Observables. - * <---------Observation(OID1)----------- // Observation was already in-flight before the previous message was processed - * (FIN) - * - * Note that multiple sessions like the above may interleave in an arbitrary fashion. - * - * Additionally the server may listen on client binding removals for cleanup using [RPC_CLIENT_BINDING_REMOVALS]. This - * requires the server to create a filter on the artemis notification address using [RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION] + * Constants and data types used by the RPC API. */ object RPCApi { private val TAG_FIELD_NAME = "tag" @@ -62,10 +56,15 @@ object RPCApi { private val OBSERVABLE_ID_FIELD_NAME = "observable-id" private val METHOD_NAME_FIELD_NAME = "method-name" - val RPC_SERVER_QUEUE_NAME = "rpc.server" - val RPC_CLIENT_QUEUE_NAME_PREFIX = "rpc.client" - val RPC_CLIENT_BINDING_REMOVALS = "rpc.clientqueueremovals" - val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions" + /** Name of the Artemis queue on which the server receives RPC requests (as [ClientToServer.RpcRequest]). */ + const val RPC_SERVER_QUEUE_NAME = "rpc.server" + /** + * Prefix to Artemis queue names used by clients to receive communication back from a server. The full queue name + * should be of the form "rpc.client.<username>.<nonce>". + */ + const val RPC_CLIENT_QUEUE_NAME_PREFIX = "rpc.client" + const val RPC_CLIENT_BINDING_REMOVALS = "rpc.clientqueueremovals" + const val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions" val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION = "${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " + @@ -83,12 +82,23 @@ object RPCApi { return ByteArray(bodySize).apply { bodyBuffer.readBytes(this) } } + /** + * Message content types which can be sent from a Corda client to a server. + */ sealed class ClientToServer { private enum class Tag { RPC_REQUEST, OBSERVABLES_CLOSED } + /** + * Request to a server to trigger the specified method with the provided arguments. + * + * @param clientAddress return address to contact the client at. + * @param id a unique ID for the request, which the server will use to identify its response with. + * @param methodName name of the method (procedure) to be called. + * @param arguments arguments to pass to the method, if any. + */ data class RpcRequest( val clientAddress: SimpleString, val id: RpcRequestId, @@ -141,6 +151,9 @@ object RPCApi { } } + /** + * Message content types which can be sent from a Corda server back to a client. + */ sealed class ServerToClient { private enum class Tag { RPC_REPLY, @@ -149,6 +162,7 @@ object RPCApi { abstract fun writeToClientMessage(context: SerializationContext, message: ClientMessage) + /** Reply in response to an [ClientToServer.RpcRequest]. */ data class RpcReply( val id: RpcRequestId, val result: Try diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt b/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt index d99f17dab7..3911943969 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt @@ -41,10 +41,17 @@ open class RPCException(message: String?, cause: Throwable?) : CordaRuntimeExcep constructor(msg: String) : this(msg, null) } +/** + * Thrown to indicate that the calling user does not have permission for something they have requested (for example + * calling a method). + */ @CordaSerializable class PermissionException(msg: String) : RuntimeException(msg) -// The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly. +/** + * The Kryo used for the RPC wire protocol. + */ +// Every type in the wire protocol is listed here explicitly. // This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes, // because we can see everything we're using in one place. class RPCKryo(observableSerializer: Serializer>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) {