New counterparty model and subscription mechanism to retrieve and track counterparty changes in network map

New transaction creation screen for creating new cash transactions, using party info source from the counterparty model.
This commit is contained in:
Patrick Kuo
2016-10-07 17:07:23 +01:00
parent d7ca215f7d
commit d4362fbd78
23 changed files with 461 additions and 71 deletions

View File

@ -5,7 +5,9 @@ import com.r3corda.contracts.asset.InsufficientBalanceException
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.toStringShort
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.transactions.SignedTransaction
@ -33,6 +35,10 @@ class ServerRPCOps(
) : CordaRPCOps {
override val protocolVersion: Int = 0
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
return services.networkMapCache.track()
}
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> {
return databaseTransaction(database) {
val (vault, updates) = services.vaultService.track()
@ -153,5 +159,4 @@ class ServerRPCOps(
class InputStateRefResolveFailed(stateRefs: List<StateRef>) :
Exception("Failed to resolve input StateRefs $stateRefs")
}
}

View File

@ -3,6 +3,8 @@ package com.r3corda.node.services.messaging
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.StateMachineRunId
@ -103,9 +105,15 @@ interface CordaRPCOps : RPCOps {
@RPCReturnsObservables
fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>>
/**
* Returns all parties currently visible on the network with their advertised services and an observable of future updates to the network.
*/
@RPCReturnsObservables
fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>>
/**
* Executes the given command, possibly triggering cash creation etc.
* TODO: The signature of this is weird because it's the remains of an old service call, we should have a call for each command instead.
*/
fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult
}
}

View File

@ -5,13 +5,17 @@ import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.DefaultSerializers
import com.esotericsoftware.kryo.serializers.JavaSerializer
import com.google.common.net.HostAndPort
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.ErrorOr
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.PhysicalLocation
import com.r3corda.core.node.ServiceEntry
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.StateMachineRunId
@ -163,17 +167,42 @@ private class RPCKryo(private val observableSerializer: Serializer<Observable<An
register(setOf(Unit).javaClass) // SingletonSet
register(TransactionBuildResult.ProtocolStarted::class.java)
register(TransactionBuildResult.Failed::class.java)
register(ServiceEntry::class.java)
register(NodeInfo::class.java)
register(PhysicalLocation::class.java)
register(NetworkMapCache.MapChange::class.java)
register(NetworkMapCache.MapChangeType::class.java)
register(ArtemisMessagingComponent.NodeAddress::class.java,
read = { kryo, input -> ArtemisMessagingComponent.NodeAddress(parsePublicKeyBase58(kryo.readObject(input, String::class.java)), kryo.readObject(input, HostAndPort::class.java)) },
write = { kryo, output, nodeAddress ->
kryo.writeObject(output, nodeAddress.identity.toBase58String())
kryo.writeObject(output, nodeAddress.hostAndPort)
}
)
register(HostAndPort::class.java)
register(ServiceInfo::class.java, read = { kryo, input -> ServiceInfo.parse(input.readString()) }, write = Kryo::writeObject)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(IllegalArgumentException::class.java)
// Kryo couldn't serialize Collections.unmodifiableCollection in Throwable correctly, causing null pointer exception when try to access the deserialize object.
register(NoSuchElementException::class.java, JavaSerializer())
register(RPCException::class.java)
register(Array<StackTraceElement>::class.java, object : Serializer<Array<StackTraceElement>>() {
override fun read(kryo: Kryo, input: Input, type: Class<Array<StackTraceElement>>): Array<StackTraceElement> = emptyArray()
override fun write(kryo: Kryo, output: Output, `object`: Array<StackTraceElement>) {}
})
register(Array<StackTraceElement>::class.java, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> })
register(Collections.unmodifiableList(emptyList<String>()).javaClass)
}
// Helper method, attempt to reduce boiler plate code
private fun <T> register(type: Class<T>, read: (Kryo, Input) -> T, write: (Kryo, Output, T) -> Unit) {
register(type, object : Serializer<T>() {
override fun read(kryo: Kryo, input: Input, type: Class<T>?): T {
return read(kryo, input)
}
override fun write(kryo: Kryo, output: Output, o: T) {
write(kryo, output, o)
}
})
}
val observableRegistration: Registration? = if (observableSerializer != null) register(Observable::class.java, observableSerializer) else null
override fun getRegistration(type: Class<*>): Registration {

View File

@ -3,6 +3,7 @@ package com.r3corda.node.services.network
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.bufferUntilSubscribed
import com.r3corda.core.contracts.Contract
import com.r3corda.core.crypto.Party
import com.r3corda.core.map
@ -23,6 +24,7 @@ import com.r3corda.node.services.network.NetworkMapService.Companion.FETCH_PROTO
import com.r3corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NetworkMapService.FetchMapResponse
import com.r3corda.node.services.network.NetworkMapService.SubscribeResponse
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.protocols.sendRequest
import rx.Observable
@ -54,6 +56,18 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
private var registeredForPush = false
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
override fun track(): Pair<List<NodeInfo>, Observable<MapChange>> {
synchronized(_changed) {
fun NodeInfo.isCordaService(): Boolean {
return advertisedServices.any { it.info.type in setOf(SimpleNotaryService.type, NetworkMapService.type) }
}
val currentParties = partyNodes.filter { !it.isCordaService() }
val changes = changed.filter { !it.node.isCordaService() }
return Pair(currentParties, changes.bufferUntilSubscribed())
}
}
override fun get() = registeredNodes.map { it.value }
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.any { it.info.type.isSubTypeOf(serviceType) } }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
@ -96,17 +110,22 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
}
override fun addNode(node: NodeInfo) {
val oldValue = registeredNodes.put(node.legalIdentity, node)
if (oldValue == null) {
_changed.onNext(MapChange(node, oldValue, MapChangeType.Added))
} else if(oldValue != node) {
_changed.onNext(MapChange(node, oldValue, MapChangeType.Modified))
synchronized(_changed) {
val oldValue = registeredNodes.put(node.legalIdentity, node)
if (oldValue == null) {
_changed.onNext(MapChange(node, oldValue, MapChangeType.Added))
} else if (oldValue != node) {
_changed.onNext(MapChange(node, oldValue, MapChangeType.Modified))
}
}
}
override fun removeNode(node: NodeInfo) {
val oldValue = registeredNodes.remove(node.legalIdentity)
_changed.onNext(MapChange(node, oldValue, MapChangeType.Removed))
synchronized(_changed) {
val oldValue = registeredNodes.remove(node.legalIdentity)
_changed.onNext(MapChange(node, oldValue, MapChangeType.Removed))
}
}
/**