mirror of
https://github.com/corda/corda.git
synced 2025-01-29 15:43:55 +00:00
File moves to core module and split of interfaces and implementations in preparation for further moves
This commit is contained in:
parent
4271693b85
commit
a18e7b06bf
137
src/main/kotlin/core/node/subsystems/InMemoryNetworkMapCache.kt
Normal file
137
src/main/kotlin/core/node/subsystems/InMemoryNetworkMapCache.kt
Normal file
@ -0,0 +1,137 @@
|
||||
package core.node.subsystems
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import core.Contract
|
||||
import core.Party
|
||||
import core.crypto.SecureHash
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.StateMachineManager
|
||||
import core.messaging.runOnNextMessage
|
||||
import core.messaging.send
|
||||
import core.node.NodeInfo
|
||||
import core.node.services.*
|
||||
import core.random63BitValue
|
||||
import core.serialization.deserialize
|
||||
import core.serialization.serialize
|
||||
import core.utilities.AddOrRemove
|
||||
import java.security.SignatureException
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* Extremely simple in-memory cache of the network map.
|
||||
*/
|
||||
@ThreadSafe
|
||||
open class InMemoryNetworkMapCache() : NetworkMapCache {
|
||||
override val networkMapNodes: List<NodeInfo>
|
||||
get() = get(NetworkMapService.Type)
|
||||
override val regulators: List<NodeInfo>
|
||||
get() = get(RegulatorService.Type)
|
||||
override val timestampingNodes: List<NodeInfo>
|
||||
get() = get(TimestamperService.Type)
|
||||
override val ratesOracleNodes: List<NodeInfo>
|
||||
get() = get(NodeInterestRates.Type)
|
||||
override val partyNodes: List<NodeInfo>
|
||||
get() = registeredNodes.map { it.value }
|
||||
|
||||
private var registeredForPush = false
|
||||
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
|
||||
|
||||
override fun get() = registeredNodes.map { it.value }
|
||||
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value }
|
||||
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
|
||||
|
||||
override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean,
|
||||
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
|
||||
if (subscribe && !registeredForPush) {
|
||||
// Add handler to the network, for updates received from the remote network map service.
|
||||
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r ->
|
||||
try {
|
||||
val req = message.data.deserialize<NetworkMapService.Update>()
|
||||
val hash = SecureHash.sha256(req.wireReg.serialize().bits)
|
||||
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX,
|
||||
NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits)
|
||||
net.send(ackMessage, req.replyTo)
|
||||
processUpdatePush(req)
|
||||
} catch(e: NodeMapError) {
|
||||
NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
|
||||
} catch(e: Exception) {
|
||||
NetworkMapCache.logger.error("Exception processing update from network map service", e)
|
||||
}
|
||||
}
|
||||
registeredForPush = true
|
||||
}
|
||||
|
||||
// Fetch the network map and register for updates at the same time
|
||||
val sessionID = random63BitValue()
|
||||
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID)
|
||||
|
||||
// Add a message handler for the response, and prepare a future to put the data into.
|
||||
// Note that the message handler will run on the network thread (not this one).
|
||||
val future = SettableFuture.create<Unit>()
|
||||
net.runOnNextMessage("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message ->
|
||||
val resp = message.data.deserialize<NetworkMapService.FetchMapResponse>()
|
||||
// We may not receive any nodes back, if the map hasn't changed since the version specified
|
||||
resp.nodes?.forEach { processRegistration(it) }
|
||||
future.set(Unit)
|
||||
}
|
||||
net.send("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.0", req, service.address)
|
||||
|
||||
return future
|
||||
}
|
||||
|
||||
override fun addNode(node: NodeInfo) {
|
||||
registeredNodes[node.identity] = node
|
||||
}
|
||||
|
||||
override fun removeNode(node: NodeInfo) {
|
||||
registeredNodes.remove(node.identity)
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribes from updates from the given map service.
|
||||
*
|
||||
* @param service the network map service to listen to updates from.
|
||||
*/
|
||||
override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
|
||||
// Fetch the network map and register for updates at the same time
|
||||
val sessionID = random63BitValue()
|
||||
val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID)
|
||||
|
||||
// Add a message handler for the response, and prepare a future to put the data into.
|
||||
// Note that the message handler will run on the network thread (not this one).
|
||||
val future = SettableFuture.create<Unit>()
|
||||
net.runOnNextMessage("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message ->
|
||||
val resp = message.data.deserialize<NetworkMapService.SubscribeResponse>()
|
||||
if (resp.confirmed) {
|
||||
future.set(Unit)
|
||||
} else {
|
||||
future.setException(NetworkCacheError.DeregistrationFailed())
|
||||
}
|
||||
}
|
||||
net.send("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.0", req, service.address)
|
||||
|
||||
return future
|
||||
}
|
||||
|
||||
fun processUpdatePush(req: NetworkMapService.Update) {
|
||||
val reg: NodeRegistration
|
||||
try {
|
||||
reg = req.wireReg.verified()
|
||||
} catch(e: SignatureException) {
|
||||
throw NodeMapError.InvalidSignature()
|
||||
}
|
||||
processRegistration(reg)
|
||||
}
|
||||
|
||||
private fun processRegistration(reg: NodeRegistration) {
|
||||
// TODO: Implement filtering by sequence number, so we only accept changes that are
|
||||
// more recent than the latest change we've processed.
|
||||
when (reg.type) {
|
||||
AddOrRemove.ADD -> addNode(reg.node)
|
||||
AddOrRemove.REMOVE -> removeNode(reg.node)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,27 +1,13 @@
|
||||
package core.node.subsystems
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import core.Contract
|
||||
import core.Party
|
||||
import core.crypto.SecureHash
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.StateMachineManager
|
||||
import core.messaging.runOnNextMessage
|
||||
import core.messaging.send
|
||||
import core.node.NodeInfo
|
||||
import core.node.services.*
|
||||
import core.node.services.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC
|
||||
import core.node.services.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
|
||||
import core.random63BitValue
|
||||
import core.serialization.deserialize
|
||||
import core.serialization.serialize
|
||||
import core.utilities.AddOrRemove
|
||||
import core.node.services.ServiceType
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.security.SignatureException
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A network map contains lists of nodes on the network along with information about their identity keys, services
|
||||
@ -104,122 +90,6 @@ interface NetworkMapCache {
|
||||
fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
|
||||
}
|
||||
|
||||
/**
|
||||
* Extremely simple in-memory cache of the network map.
|
||||
*/
|
||||
@ThreadSafe
|
||||
open class InMemoryNetworkMapCache() : NetworkMapCache {
|
||||
override val networkMapNodes: List<NodeInfo>
|
||||
get() = get(NetworkMapService.Type)
|
||||
override val regulators: List<NodeInfo>
|
||||
get() = get(RegulatorService.Type)
|
||||
override val timestampingNodes: List<NodeInfo>
|
||||
get() = get(TimestamperService.Type)
|
||||
override val ratesOracleNodes: List<NodeInfo>
|
||||
get() = get(NodeInterestRates.Type)
|
||||
override val partyNodes: List<NodeInfo>
|
||||
get() = registeredNodes.map { it.value }
|
||||
|
||||
private var registeredForPush = false
|
||||
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
|
||||
|
||||
override fun get() = registeredNodes.map { it.value }
|
||||
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value }
|
||||
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
|
||||
|
||||
override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean,
|
||||
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
|
||||
if (subscribe && !registeredForPush) {
|
||||
// Add handler to the network, for updates received from the remote network map service.
|
||||
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r ->
|
||||
try {
|
||||
val req = message.data.deserialize<NetworkMapService.Update>()
|
||||
val hash = SecureHash.sha256(req.wireReg.serialize().bits)
|
||||
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX,
|
||||
NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits)
|
||||
net.send(ackMessage, req.replyTo)
|
||||
processUpdatePush(req)
|
||||
} catch(e: NodeMapError) {
|
||||
NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
|
||||
} catch(e: Exception) {
|
||||
NetworkMapCache.logger.error("Exception processing update from network map service", e)
|
||||
}
|
||||
}
|
||||
registeredForPush = true
|
||||
}
|
||||
|
||||
// Fetch the network map and register for updates at the same time
|
||||
val sessionID = random63BitValue()
|
||||
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID)
|
||||
|
||||
// Add a message handler for the response, and prepare a future to put the data into.
|
||||
// Note that the message handler will run on the network thread (not this one).
|
||||
val future = SettableFuture.create<Unit>()
|
||||
net.runOnNextMessage("$FETCH_PROTOCOL_TOPIC.$sessionID", MoreExecutors.directExecutor()) { message ->
|
||||
val resp = message.data.deserialize<NetworkMapService.FetchMapResponse>()
|
||||
// We may not receive any nodes back, if the map hasn't changed since the version specified
|
||||
resp.nodes?.forEach { processRegistration(it) }
|
||||
future.set(Unit)
|
||||
}
|
||||
net.send("$FETCH_PROTOCOL_TOPIC.0", req, service.address)
|
||||
|
||||
return future
|
||||
}
|
||||
|
||||
override fun addNode(node: NodeInfo) {
|
||||
registeredNodes[node.identity] = node
|
||||
}
|
||||
|
||||
override fun removeNode(node: NodeInfo) {
|
||||
registeredNodes.remove(node.identity)
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribes from updates from the given map service.
|
||||
*
|
||||
* @param service the network map service to listen to updates from.
|
||||
*/
|
||||
override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
|
||||
// Fetch the network map and register for updates at the same time
|
||||
val sessionID = random63BitValue()
|
||||
val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID)
|
||||
|
||||
// Add a message handler for the response, and prepare a future to put the data into.
|
||||
// Note that the message handler will run on the network thread (not this one).
|
||||
val future = SettableFuture.create<Unit>()
|
||||
net.runOnNextMessage("$SUBSCRIPTION_PROTOCOL_TOPIC.$sessionID", MoreExecutors.directExecutor()) { message ->
|
||||
val resp = message.data.deserialize<NetworkMapService.SubscribeResponse>()
|
||||
if (resp.confirmed) {
|
||||
future.set(Unit)
|
||||
} else {
|
||||
future.setException(NetworkCacheError.DeregistrationFailed())
|
||||
}
|
||||
}
|
||||
net.send("$SUBSCRIPTION_PROTOCOL_TOPIC.0", req, service.address)
|
||||
|
||||
return future
|
||||
}
|
||||
|
||||
fun processUpdatePush(req: NetworkMapService.Update) {
|
||||
val reg: NodeRegistration
|
||||
try {
|
||||
reg = req.wireReg.verified()
|
||||
} catch(e: SignatureException) {
|
||||
throw NodeMapError.InvalidSignature()
|
||||
}
|
||||
processRegistration(reg)
|
||||
}
|
||||
|
||||
private fun processRegistration(reg: NodeRegistration) {
|
||||
// TODO: Implement filtering by sequence number, so we only accept changes that are
|
||||
// more recent than the latest change we've processed.
|
||||
when (reg.type) {
|
||||
AddOrRemove.ADD -> addNode(reg.node)
|
||||
AddOrRemove.REMOVE -> removeNode(reg.node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sealed class NetworkCacheError : Exception() {
|
||||
/** Indicates a failure to deregister, because of a rejected request from the remote node */
|
||||
class DeregistrationFailed : NetworkCacheError()
|
||||
|
@ -5,8 +5,6 @@ import contracts.Cash
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
import core.node.ServiceHub
|
||||
import core.node.subsystems.Wallet
|
||||
import core.node.subsystems.WalletService
|
||||
import core.utilities.loggerFor
|
||||
import core.utilities.trace
|
||||
import java.security.PublicKey
|
||||
@ -26,7 +24,7 @@ class NodeWalletService(private val services: ServiceHub) : WalletService {
|
||||
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
|
||||
// to wallet somewhere.
|
||||
private class InnerState {
|
||||
var wallet: Wallet = Wallet(emptyList<StateAndRef<OwnableState>>())
|
||||
var wallet: Wallet = WalletImpl(emptyList<StateAndRef<OwnableState>>())
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
@ -105,7 +103,7 @@ class NodeWalletService(private val services: ServiceHub) : WalletService {
|
||||
"Applied tx ${tx.id.prefixChars()} to the wallet: consumed ${consumed.size} states and added ${newStates.size}"
|
||||
}
|
||||
|
||||
return Wallet(newStates)
|
||||
return WalletImpl(newStates)
|
||||
}
|
||||
|
||||
private class BalanceMetric : Gauge<Long> {
|
||||
|
@ -1,7 +1,6 @@
|
||||
package core.node.subsystems
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import contracts.Cash
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
import core.node.services.AttachmentStorage
|
||||
@ -26,8 +25,12 @@ val TOPIC_DEFAULT_POSTFIX = ".0"
|
||||
* because we own them. This class represents an immutable, stable state of a wallet: it is guaranteed not to
|
||||
* change out from underneath you, even though the canonical currently-best-known wallet may change as we learn
|
||||
* about new transactions from our peers and generate new transactions that consume states ourselves.
|
||||
*
|
||||
* This absract class has no references to Cash contracts.
|
||||
*/
|
||||
data class Wallet(val states: List<StateAndRef<ContractState>>) {
|
||||
abstract class Wallet {
|
||||
abstract val states: List<StateAndRef<ContractState>>
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
inline fun <reified T : OwnableState> statesOfType() = states.filter { it.state is T } as List<StateAndRef<T>>
|
||||
|
||||
@ -35,13 +38,7 @@ data class Wallet(val states: List<StateAndRef<ContractState>>) {
|
||||
* Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
|
||||
* which we have no cash evaluate to null (not present in map), not 0.
|
||||
*/
|
||||
val cashBalances: Map<Currency, Amount> get() = states.
|
||||
// Select the states we own which are cash, ignore the rest, take the amounts.
|
||||
mapNotNull { (it.state as? Cash.State)?.amount }.
|
||||
// Turn into a Map<Currency, List<Amount>> like { GBP -> (£100, £500, etc), USD -> ($2000, $50) }
|
||||
groupBy { it.currency }.
|
||||
// Collapse to Map<Currency, Amount> by summing all the amounts of the same currency together.
|
||||
mapValues { it.value.sumOrThrow() }
|
||||
abstract val cashBalances: Map<Currency, Amount>
|
||||
}
|
||||
|
||||
/**
|
||||
|
31
src/main/kotlin/core/node/subsystems/WalletImpl.kt
Normal file
31
src/main/kotlin/core/node/subsystems/WalletImpl.kt
Normal file
@ -0,0 +1,31 @@
|
||||
package core.node.subsystems
|
||||
|
||||
import contracts.Cash
|
||||
import core.Amount
|
||||
import core.ContractState
|
||||
import core.StateAndRef
|
||||
import core.sumOrThrow
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* A wallet (name may be temporary) wraps a set of states that are useful for us to keep track of, for instance,
|
||||
* because we own them. This class represents an immutable, stable state of a wallet: it is guaranteed not to
|
||||
* change out from underneath you, even though the canonical currently-best-known wallet may change as we learn
|
||||
* about new transactions from our peers and generate new transactions that consume states ourselves.
|
||||
*
|
||||
* This concrete implementation references Cash contracts.
|
||||
*/
|
||||
class WalletImpl(override val states: List<StateAndRef<ContractState>>) : Wallet() {
|
||||
|
||||
/**
|
||||
* Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
|
||||
* which we have no cash evaluate to null (not present in map), not 0.
|
||||
*/
|
||||
override val cashBalances: Map<Currency, Amount> get() = states.
|
||||
// Select the states we own which are cash, ignore the rest, take the amounts.
|
||||
mapNotNull { (it.state as? Cash.State)?.amount }.
|
||||
// Turn into a Map<Currency, List<Amount>> like { GBP -> (£100, £500, etc), USD -> ($2000, $50) }
|
||||
groupBy { it.currency }.
|
||||
// Collapse to Map<Currency, Amount> by summing all the amounts of the same currency together.
|
||||
mapValues { it.value.sumOrThrow() }
|
||||
}
|
@ -13,6 +13,7 @@ import core.node.storage.CheckpointStorage
|
||||
import core.node.subsystems.NodeWalletService
|
||||
import core.node.subsystems.StorageServiceImpl
|
||||
import core.node.subsystems.Wallet
|
||||
import core.node.subsystems.WalletImpl
|
||||
import core.testing.InMemoryMessagingNetwork
|
||||
import core.testing.MockNetwork
|
||||
import core.testutils.*
|
||||
@ -394,7 +395,7 @@ class TwoPartyTradeProtocolTests {
|
||||
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Move() }
|
||||
}
|
||||
|
||||
val wallet = Wallet(listOf<StateAndRef<Cash.State>>(lookup("bob cash 1"), lookup("bob cash 2")))
|
||||
val wallet = WalletImpl(listOf<StateAndRef<Cash.State>>(lookup("bob cash 1"), lookup("bob cash 2")))
|
||||
return Pair(wallet, listOf(eb1, bc1, bc2))
|
||||
}
|
||||
|
||||
@ -410,7 +411,7 @@ class TwoPartyTradeProtocolTests {
|
||||
attachment(attachmentID)
|
||||
}
|
||||
|
||||
val wallet = Wallet(listOf<StateAndRef<Cash.State>>(lookup("alice's paper")))
|
||||
val wallet = WalletImpl(listOf<StateAndRef<Cash.State>>(lookup("alice's paper")))
|
||||
return Pair(wallet, listOf(ap))
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user