Add kdocs on service addressing

This commit is contained in:
Andras Slemmer 2016-12-13 15:32:53 +00:00 committed by exfalso
parent 6a796cef35
commit 4f44962962
7 changed files with 29 additions and 1 deletions

View File

@ -80,6 +80,7 @@ interface MessagingService {
*/
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
/** Given information about either a specific node or a service returns it's corresponding address */
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
/** Returns an address that refers to this node. */

View File

@ -4,6 +4,9 @@ import net.corda.core.crypto.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceEntry
/**
* Holds information about a [Party], which may refer to either a specific node or a service.
*/
sealed class PartyInfo(val party: Party) {
class Node(val node: NodeInfo) : PartyInfo(node.legalIdentity)
class Service(val service: ServiceEntry) : PartyInfo(service.identity)

View File

@ -72,6 +72,7 @@ interface DriverDSLExposedInterface {
* @param notaryName The legal name of the advertised distributed notary service.
* @param clusterSize Number of nodes to create for the cluster.
* @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type].
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @return The [Party] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster.
*/
fun startNotaryCluster(

View File

@ -76,6 +76,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
*
* @param queueName The name of the queue this address is associated with. This is either the direct peer queue or
* an advertised service queue.
* @param hostAndPort The address of the node.
*/
data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisPeerAddress {
companion object {
@ -87,6 +88,15 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)"
}
/**
* [ServiceAddress] implements [MessageRecipientGroup]. It holds a queue associated with a service advertised by
* zero or more nodes. Each advertising node has an associated consumer.
*
* By sending to such an address Artemis will pick a consumer (uses Round Robin by default) and sends the message
* there. We use this to establish sessions involving service counterparties.
*
* @param identity The service identity's owning key.
*/
data class ServiceAddress(val identity: CompositeKey) : ArtemisAddress, MessageRecipientGroup {
override val queueName: SimpleString = SimpleString("$SERVICES_PREFIX${identity.toBase58String()}")
}

View File

@ -123,8 +123,10 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
*/
private fun destroyOrCreateBridges(change: MapChange) {
fun addAddresses(node: NodeInfo, target: HashSet<ArtemisPeerAddress>) {
// Add the node's address with the p2p queue.
val nodeAddress = node.address as ArtemisPeerAddress
target.add(nodeAddress)
// Add the node's address with service queues, one per service.
change.node.advertisedServices.forEach {
target.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort))
}
@ -196,6 +198,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
queueName.startsWith(SERVICES_PREFIX) -> try {
val identity = CompositeKey.parseFromBase58(queueName.substring(SERVICES_PREFIX.length))
val nodeInfos = networkMapCache.getNodesByAdvertisedServiceIdentityKey(identity)
// Create a bridge for each node advertising the service.
for (nodeInfo in nodeInfos) {
maybeDeployBridgeForNode(queueName, nodeInfo)
}

View File

@ -481,6 +481,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
data class SessionEnd(override val recipientSessionId: Long) : ExistingSessionMessage
/**
* [FlowSessionState] describes the session's state.
*
* [Initiating] is pre-handshake. [Initiating.otherParty] at this point holds a [Party] corresponding to either a
* specific peer or a service.
* [Initiated] is post-handshake. At this point [Initiating.otherParty] will have been resolved to a specific peer
* [Initiated.peerParty], and the peer's sessionId has been initialised.
*/
sealed class FlowSessionState {
abstract val sendToParty: Party
class Initiating(

View File

@ -38,7 +38,8 @@ import kotlin.concurrent.thread
* case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit
* testing).
*
* @param random The RNG used to choose which node to send to in case one sends to a service.
* @param servicePeerAllocationStrategy defines the strategy to be used when determining which peer to send to in case
* a service is addressed.
*/
@ThreadSafe
class InMemoryMessagingNetwork(
@ -74,6 +75,7 @@ class InMemoryMessagingNetwork(
private val messageReceiveQueues = HashMap<PeerHandle, LinkedBlockingQueue<MessageTransfer>>()
private val _receivedMessages = PublishSubject.create<MessageTransfer>()
// Holds the mapping from services to peers advertising the service.
private val serviceToPeersMapping = HashMap<ServiceHandle, LinkedHashSet<PeerHandle>>()
val messagesInFlight = ReusableLatch()