Merged in plt-141-moves-only (pull request #91)

File moves to core module and split of interfaces and implementations in preparation for further moves
This commit is contained in:
Rick Parker 2016-05-13 10:47:24 +01:00
commit 3f77de0de8
14 changed files with 180 additions and 146 deletions

View File

@ -0,0 +1,137 @@
package core.node.subsystems
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import core.Contract
import core.Party
import core.crypto.SecureHash
import core.messaging.MessagingService
import core.messaging.StateMachineManager
import core.messaging.runOnNextMessage
import core.messaging.send
import core.node.NodeInfo
import core.node.services.*
import core.random63BitValue
import core.serialization.deserialize
import core.serialization.serialize
import core.utilities.AddOrRemove
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* Extremely simple in-memory cache of the network map.
*/
@ThreadSafe
open class InMemoryNetworkMapCache() : NetworkMapCache {
override val networkMapNodes: List<NodeInfo>
get() = get(NetworkMapService.Type)
override val regulators: List<NodeInfo>
get() = get(RegulatorService.Type)
override val timestampingNodes: List<NodeInfo>
get() = get(TimestamperService.Type)
override val ratesOracleNodes: List<NodeInfo>
get() = get(NodeInterestRates.Type)
override val partyNodes: List<NodeInfo>
get() = registeredNodes.map { it.value }
private var registeredForPush = false
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
override fun get() = registeredNodes.map { it.value }
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean,
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val hash = SecureHash.sha256(req.wireReg.serialize().bits)
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX,
NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits)
net.send(ackMessage, req.replyTo)
processUpdatePush(req)
} catch(e: NodeMapError) {
NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
} catch(e: Exception) {
NetworkMapCache.logger.error("Exception processing update from network map service", e)
}
}
registeredForPush = true
}
// Fetch the network map and register for updates at the same time
val sessionID = random63BitValue()
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID)
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
net.runOnNextMessage("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.FetchMapResponse>()
// We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) }
future.set(Unit)
}
net.send("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.0", req, service.address)
return future
}
override fun addNode(node: NodeInfo) {
registeredNodes[node.identity] = node
}
override fun removeNode(node: NodeInfo) {
registeredNodes.remove(node.identity)
}
/**
* Unsubscribes from updates from the given map service.
*
* @param service the network map service to listen to updates from.
*/
override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
// Fetch the network map and register for updates at the same time
val sessionID = random63BitValue()
val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID)
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
net.runOnNextMessage("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.SubscribeResponse>()
if (resp.confirmed) {
future.set(Unit)
} else {
future.setException(NetworkCacheError.DeregistrationFailed())
}
}
net.send("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.0", req, service.address)
return future
}
fun processUpdatePush(req: NetworkMapService.Update) {
val reg: NodeRegistration
try {
reg = req.wireReg.verified()
} catch(e: SignatureException) {
throw NodeMapError.InvalidSignature()
}
processRegistration(reg)
}
private fun processRegistration(reg: NodeRegistration) {
// TODO: Implement filtering by sequence number, so we only accept changes that are
// more recent than the latest change we've processed.
when (reg.type) {
AddOrRemove.ADD -> addNode(reg.node)
AddOrRemove.REMOVE -> removeNode(reg.node)
}
}
}

View File

@ -1,27 +1,13 @@
package core.node.subsystems package core.node.subsystems
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import core.Contract import core.Contract
import core.Party import core.Party
import core.crypto.SecureHash
import core.messaging.MessagingService import core.messaging.MessagingService
import core.messaging.StateMachineManager import core.messaging.StateMachineManager
import core.messaging.runOnNextMessage
import core.messaging.send
import core.node.NodeInfo import core.node.NodeInfo
import core.node.services.* import core.node.services.ServiceType
import core.node.services.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC
import core.node.services.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
import core.random63BitValue
import core.serialization.deserialize
import core.serialization.serialize
import core.utilities.AddOrRemove
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/** /**
* A network map contains lists of nodes on the network along with information about their identity keys, services * A network map contains lists of nodes on the network along with information about their identity keys, services
@ -104,122 +90,6 @@ interface NetworkMapCache {
fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit> fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
} }
/**
* Extremely simple in-memory cache of the network map.
*/
@ThreadSafe
open class InMemoryNetworkMapCache() : NetworkMapCache {
override val networkMapNodes: List<NodeInfo>
get() = get(NetworkMapService.Type)
override val regulators: List<NodeInfo>
get() = get(RegulatorService.Type)
override val timestampingNodes: List<NodeInfo>
get() = get(TimestamperService.Type)
override val ratesOracleNodes: List<NodeInfo>
get() = get(NodeInterestRates.Type)
override val partyNodes: List<NodeInfo>
get() = registeredNodes.map { it.value }
private var registeredForPush = false
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
override fun get() = registeredNodes.map { it.value }
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean,
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val hash = SecureHash.sha256(req.wireReg.serialize().bits)
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX,
NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits)
net.send(ackMessage, req.replyTo)
processUpdatePush(req)
} catch(e: NodeMapError) {
NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
} catch(e: Exception) {
NetworkMapCache.logger.error("Exception processing update from network map service", e)
}
}
registeredForPush = true
}
// Fetch the network map and register for updates at the same time
val sessionID = random63BitValue()
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID)
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
net.runOnNextMessage("$FETCH_PROTOCOL_TOPIC.$sessionID", MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.FetchMapResponse>()
// We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) }
future.set(Unit)
}
net.send("$FETCH_PROTOCOL_TOPIC.0", req, service.address)
return future
}
override fun addNode(node: NodeInfo) {
registeredNodes[node.identity] = node
}
override fun removeNode(node: NodeInfo) {
registeredNodes.remove(node.identity)
}
/**
* Unsubscribes from updates from the given map service.
*
* @param service the network map service to listen to updates from.
*/
override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
// Fetch the network map and register for updates at the same time
val sessionID = random63BitValue()
val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID)
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
net.runOnNextMessage("$SUBSCRIPTION_PROTOCOL_TOPIC.$sessionID", MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.SubscribeResponse>()
if (resp.confirmed) {
future.set(Unit)
} else {
future.setException(NetworkCacheError.DeregistrationFailed())
}
}
net.send("$SUBSCRIPTION_PROTOCOL_TOPIC.0", req, service.address)
return future
}
fun processUpdatePush(req: NetworkMapService.Update) {
val reg: NodeRegistration
try {
reg = req.wireReg.verified()
} catch(e: SignatureException) {
throw NodeMapError.InvalidSignature()
}
processRegistration(reg)
}
private fun processRegistration(reg: NodeRegistration) {
// TODO: Implement filtering by sequence number, so we only accept changes that are
// more recent than the latest change we've processed.
when (reg.type) {
AddOrRemove.ADD -> addNode(reg.node)
AddOrRemove.REMOVE -> removeNode(reg.node)
}
}
}
sealed class NetworkCacheError : Exception() { sealed class NetworkCacheError : Exception() {
/** Indicates a failure to deregister, because of a rejected request from the remote node */ /** Indicates a failure to deregister, because of a rejected request from the remote node */
class DeregistrationFailed : NetworkCacheError() class DeregistrationFailed : NetworkCacheError()

View File

@ -5,8 +5,6 @@ import contracts.Cash
import core.* import core.*
import core.crypto.SecureHash import core.crypto.SecureHash
import core.node.ServiceHub import core.node.ServiceHub
import core.node.subsystems.Wallet
import core.node.subsystems.WalletService
import core.utilities.loggerFor import core.utilities.loggerFor
import core.utilities.trace import core.utilities.trace
import java.security.PublicKey import java.security.PublicKey
@ -26,7 +24,7 @@ class NodeWalletService(private val services: ServiceHub) : WalletService {
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference // inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
// to wallet somewhere. // to wallet somewhere.
private class InnerState { private class InnerState {
var wallet: Wallet = Wallet(emptyList<StateAndRef<OwnableState>>()) var wallet: Wallet = WalletImpl(emptyList<StateAndRef<OwnableState>>())
} }
private val mutex = ThreadBox(InnerState()) private val mutex = ThreadBox(InnerState())
@ -105,7 +103,7 @@ class NodeWalletService(private val services: ServiceHub) : WalletService {
"Applied tx ${tx.id.prefixChars()} to the wallet: consumed ${consumed.size} states and added ${newStates.size}" "Applied tx ${tx.id.prefixChars()} to the wallet: consumed ${consumed.size} states and added ${newStates.size}"
} }
return Wallet(newStates) return WalletImpl(newStates)
} }
private class BalanceMetric : Gauge<Long> { private class BalanceMetric : Gauge<Long> {

View File

@ -1,7 +1,6 @@
package core.node.subsystems package core.node.subsystems
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import contracts.Cash
import core.* import core.*
import core.crypto.SecureHash import core.crypto.SecureHash
import core.node.services.AttachmentStorage import core.node.services.AttachmentStorage
@ -26,8 +25,12 @@ val TOPIC_DEFAULT_POSTFIX = ".0"
* because we own them. This class represents an immutable, stable state of a wallet: it is guaranteed not to * because we own them. This class represents an immutable, stable state of a wallet: it is guaranteed not to
* change out from underneath you, even though the canonical currently-best-known wallet may change as we learn * change out from underneath you, even though the canonical currently-best-known wallet may change as we learn
* about new transactions from our peers and generate new transactions that consume states ourselves. * about new transactions from our peers and generate new transactions that consume states ourselves.
*
* This absract class has no references to Cash contracts.
*/ */
data class Wallet(val states: List<StateAndRef<ContractState>>) { abstract class Wallet {
abstract val states: List<StateAndRef<ContractState>>
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
inline fun <reified T : OwnableState> statesOfType() = states.filter { it.state is T } as List<StateAndRef<T>> inline fun <reified T : OwnableState> statesOfType() = states.filter { it.state is T } as List<StateAndRef<T>>
@ -35,13 +38,7 @@ data class Wallet(val states: List<StateAndRef<ContractState>>) {
* Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for * Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
* which we have no cash evaluate to null (not present in map), not 0. * which we have no cash evaluate to null (not present in map), not 0.
*/ */
val cashBalances: Map<Currency, Amount> get() = states. abstract val cashBalances: Map<Currency, Amount>
// Select the states we own which are cash, ignore the rest, take the amounts.
mapNotNull { (it.state as? Cash.State)?.amount }.
// Turn into a Map<Currency, List<Amount>> like { GBP -> (£100, £500, etc), USD -> ($2000, $50) }
groupBy { it.currency }.
// Collapse to Map<Currency, Amount> by summing all the amounts of the same currency together.
mapValues { it.value.sumOrThrow() }
} }
/** /**

View File

@ -0,0 +1,31 @@
package core.node.subsystems
import contracts.Cash
import core.Amount
import core.ContractState
import core.StateAndRef
import core.sumOrThrow
import java.util.*
/**
* A wallet (name may be temporary) wraps a set of states that are useful for us to keep track of, for instance,
* because we own them. This class represents an immutable, stable state of a wallet: it is guaranteed not to
* change out from underneath you, even though the canonical currently-best-known wallet may change as we learn
* about new transactions from our peers and generate new transactions that consume states ourselves.
*
* This concrete implementation references Cash contracts.
*/
class WalletImpl(override val states: List<StateAndRef<ContractState>>) : Wallet() {
/**
* Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
* which we have no cash evaluate to null (not present in map), not 0.
*/
override val cashBalances: Map<Currency, Amount> get() = states.
// Select the states we own which are cash, ignore the rest, take the amounts.
mapNotNull { (it.state as? Cash.State)?.amount }.
// Turn into a Map<Currency, List<Amount>> like { GBP -> (£100, £500, etc), USD -> ($2000, $50) }
groupBy { it.currency }.
// Collapse to Map<Currency, Amount> by summing all the amounts of the same currency together.
mapValues { it.value.sumOrThrow() }
}

View File

@ -13,6 +13,7 @@ import core.node.storage.CheckpointStorage
import core.node.subsystems.NodeWalletService import core.node.subsystems.NodeWalletService
import core.node.subsystems.StorageServiceImpl import core.node.subsystems.StorageServiceImpl
import core.node.subsystems.Wallet import core.node.subsystems.Wallet
import core.node.subsystems.WalletImpl
import core.testing.InMemoryMessagingNetwork import core.testing.InMemoryMessagingNetwork
import core.testing.MockNetwork import core.testing.MockNetwork
import core.testutils.* import core.testutils.*
@ -394,7 +395,7 @@ class TwoPartyTradeProtocolTests {
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Move() } arg(MEGA_CORP_PUBKEY) { Cash.Commands.Move() }
} }
val wallet = Wallet(listOf<StateAndRef<Cash.State>>(lookup("bob cash 1"), lookup("bob cash 2"))) val wallet = WalletImpl(listOf<StateAndRef<Cash.State>>(lookup("bob cash 1"), lookup("bob cash 2")))
return Pair(wallet, listOf(eb1, bc1, bc2)) return Pair(wallet, listOf(eb1, bc1, bc2))
} }
@ -410,7 +411,7 @@ class TwoPartyTradeProtocolTests {
attachment(attachmentID) attachment(attachmentID)
} }
val wallet = Wallet(listOf<StateAndRef<Cash.State>>(lookup("alice's paper"))) val wallet = WalletImpl(listOf<StateAndRef<Cash.State>>(lookup("alice's paper")))
return Pair(wallet, listOf(ap)) return Pair(wallet, listOf(ap))
} }
} }