Nodes part of a group identity (e.g. notary) now advertise the whole group Party on the network. When sending a message to a group, a representative node advertising the group identity is first chosen (at random), and its legal identity is used for communication. Currently we assume that a single legal identity can't be advertised by more than one node (the PublicKeyTree of an identity is used for Artemis queue names and we need to do more work to properly map a single queue to multiple nodes)

This commit is contained in:
Andrius Dagys
2016-11-14 10:51:13 +00:00
parent c33c55eb20
commit d855b10817
9 changed files with 96 additions and 21 deletions

View File

@ -150,7 +150,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS) {
try {
val identity = parseKeyFromQueueName(queueName.toString())
val nodeInfo = networkMapCache.getNodeByPublicKey(identity)
val nodeInfo = networkMapCache.getNodeByPublicKeyTree(identity)
if (nodeInfo != null) {
maybeDeployBridgeForAddress(queueName, nodeInfo.address)
} else {

View File

@ -18,6 +18,7 @@ import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.NetworkMapCache.MapChangeType
import net.corda.core.node.services.ServiceType
import net.corda.core.randomOrNull
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -35,6 +36,8 @@ import javax.annotation.concurrent.ThreadSafe
/**
* Extremely simple in-memory cache of the network map.
*
* TODO: some method implementations can be moved up to [NetworkMapCache]
*/
@ThreadSafe
open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCache {
@ -65,18 +68,40 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.any { it.info.type.isSubTypeOf(serviceType) } }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
override fun getNodeByLegalName(name: String) = get().singleOrNull { it.legalIdentity.name == name }
override fun getNodeByPublicKey(publicKey: PublicKeyTree): NodeInfo? {
override fun getNodeByPublicKeyTree(publicKeyTree: PublicKeyTree): NodeInfo? {
// Although we should never have more than one match, it is theoretically possible. Report an error if it happens.
val candidates = get().filter {
(it.legalIdentity.owningKey == publicKey)
|| it.advertisedServices.any { it.identity.owningKey == publicKey }
(it.legalIdentity.owningKey == publicKeyTree)
|| it.advertisedServices.any { it.identity.owningKey == publicKeyTree }
}
if (candidates.size > 1) {
throw IllegalStateException("Found more than one match for key $publicKey")
throw IllegalStateException("Found more than one match for key $publicKeyTree")
}
return candidates.singleOrNull()
}
override fun getRepresentativeNode(party: Party): NodeInfo? {
return partyNodes.randomOrNull { it.legalIdentity == party || it.advertisedServices.any { it.identity == party } }
}
override fun getNotary(name: String): Party? {
val notaryNode = notaryNodes.randomOrNull { it.advertisedServices.any { it.info.type.isSubTypeOf(ServiceType.notary) && it.info.name == name } }
return notaryNode?.notaryIdentity
}
override fun getAnyNotary(type: ServiceType?): Party? {
val nodes = if (type == null) {
notaryNodes
} else {
require(type != ServiceType.notary && type.isSubTypeOf(ServiceType.notary)) { "The provided type must be a specific notary sub-type" }
notaryNodes.filter { it.advertisedServices.any { it.info.type == type } }
}
return nodes.randomOrNull()?.notaryIdentity
}
override fun isNotary(party: Party) = notaryNodes.any { it.notaryIdentity == party }
override fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean,
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {

View File

@ -182,11 +182,20 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
/**
* Creates a new session. The provided [otherParty] can be an identity of any advertised service on the network,
* and might be advertised by more than one node. Therefore we first choose a single node that advertises it
* and use its *legal identity* for communication. At the moment a single node can compose its legal identity out of
* multiple public keys, but we **don't support multiple nodes advertising the same legal identity**.
*/
@Suspendable
private fun startNewSession(otherParty: Party, sessionProtocol: ProtocolLogic<*>, firstPayload: Any?) : ProtocolSession {
val session = ProtocolSession(sessionProtocol, otherParty, random63BitValue(), null)
openSessions[Pair(sessionProtocol, otherParty)] = session
val counterpartyProtocol = sessionProtocol.getCounterpartyMarker(otherParty).name
val node = serviceHub.networkMapCache.getRepresentativeNode(otherParty) ?: throw IllegalArgumentException("Don't know about party $otherParty")
val nodeIdentity = node.legalIdentity
logger.trace { "Initiating a new session with $nodeIdentity (representative of $otherParty)" }
val session = ProtocolSession(sessionProtocol, nodeIdentity, random63BitValue(), null)
openSessions[Pair(sessionProtocol, nodeIdentity)] = session
val counterpartyProtocol = sessionProtocol.getCounterpartyMarker(nodeIdentity).name
val sessionInit = SessionInit(session.ourSessionId, serviceHub.myInfo.legalIdentity, counterpartyProtocol, firstPayload)
val sessionInitResponse = sendAndReceiveInternal<SessionInitResponse>(session, sessionInit)
if (sessionInitResponse is SessionConfirm) {
@ -194,7 +203,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
return session
} else {
sessionInitResponse as SessionReject
throw ProtocolSessionException("Party $otherParty rejected session attempt: ${sessionInitResponse.errorMessage}")
throw ProtocolSessionException("Party $nodeIdentity rejected session attempt: ${sessionInitResponse.errorMessage}")
}
}

View File

@ -7,6 +7,7 @@ import co.paralleluniverse.strands.Strand
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
import net.corda.core.abbreviate
import net.corda.core.crypto.Party
@ -28,7 +29,6 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.isolatedTransaction
import kotlinx.support.jdk8.collections.removeIf
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
import rx.Observable
@ -432,7 +432,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
private fun sendSessionMessage(party: Party, message: SessionMessage, psm: ProtocolStateMachineImpl<*>?) {
val node = serviceHub.networkMapCache.getNodeByPublicKey(party.owningKey)
val node = serviceHub.networkMapCache.getNodeByPublicKeyTree(party.owningKey)
?: throw IllegalArgumentException("Don't know about party $party")
val logger = psm?.logger ?: logger
logger.trace { "Sending $message to party $party" }

View File

@ -34,12 +34,12 @@ class InMemoryNetworkMapCacheTest {
val nodeB = network.createNode(null, -1, MockNetwork.DefaultFactory, true, "Node B", keyPair, ServiceInfo(NetworkMapService.type))
// Node A currently knows only about itself, so this returns node A
assertEquals(nodeA.netMapCache.getNodeByPublicKey(keyPair.public.tree), nodeA.info)
assertEquals(nodeA.netMapCache.getNodeByPublicKeyTree(keyPair.public.tree), nodeA.info)
nodeA.netMapCache.addNode(nodeB.info)
// Now both nodes match, so it throws an error
expect<IllegalStateException> {
nodeA.netMapCache.getNodeByPublicKey(keyPair.public.tree)
nodeA.netMapCache.getNodeByPublicKeyTree(keyPair.public.tree)
}
}
}