diff --git a/src/main/kotlin/core/TransactionTools.kt b/core/src/main/kotlin/core/TransactionTools.kt similarity index 100% rename from src/main/kotlin/core/TransactionTools.kt rename to core/src/main/kotlin/core/TransactionTools.kt diff --git a/src/main/kotlin/core/messaging/Messaging.kt b/core/src/main/kotlin/core/messaging/Messaging.kt similarity index 100% rename from src/main/kotlin/core/messaging/Messaging.kt rename to core/src/main/kotlin/core/messaging/Messaging.kt diff --git a/src/main/kotlin/core/node/NodeInfo.kt b/core/src/main/kotlin/core/node/NodeInfo.kt similarity index 100% rename from src/main/kotlin/core/node/NodeInfo.kt rename to core/src/main/kotlin/core/node/NodeInfo.kt diff --git a/src/main/kotlin/core/node/PhysicalLocationStructures.kt b/core/src/main/kotlin/core/node/PhysicalLocationStructures.kt similarity index 100% rename from src/main/kotlin/core/node/PhysicalLocationStructures.kt rename to core/src/main/kotlin/core/node/PhysicalLocationStructures.kt diff --git a/src/main/kotlin/core/utilities/Logging.kt b/core/src/main/kotlin/core/utilities/Logging.kt similarity index 100% rename from src/main/kotlin/core/utilities/Logging.kt rename to core/src/main/kotlin/core/utilities/Logging.kt diff --git a/src/main/kotlin/core/utilities/RecordingMap.kt b/core/src/main/kotlin/core/utilities/RecordingMap.kt similarity index 100% rename from src/main/kotlin/core/utilities/RecordingMap.kt rename to core/src/main/kotlin/core/utilities/RecordingMap.kt diff --git a/src/main/kotlin/core/utilities/UntrustworthyData.kt b/core/src/main/kotlin/core/utilities/UntrustworthyData.kt similarity index 100% rename from src/main/kotlin/core/utilities/UntrustworthyData.kt rename to core/src/main/kotlin/core/utilities/UntrustworthyData.kt diff --git a/src/main/kotlin/protocols/AbstractRequestMessage.kt b/core/src/main/kotlin/protocols/AbstractRequestMessage.kt similarity index 100% rename from src/main/kotlin/protocols/AbstractRequestMessage.kt rename to core/src/main/kotlin/protocols/AbstractRequestMessage.kt diff --git a/src/main/kotlin/core/node/subsystems/InMemoryNetworkMapCache.kt b/src/main/kotlin/core/node/subsystems/InMemoryNetworkMapCache.kt new file mode 100644 index 0000000000..b259df419d --- /dev/null +++ b/src/main/kotlin/core/node/subsystems/InMemoryNetworkMapCache.kt @@ -0,0 +1,137 @@ +package core.node.subsystems + +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.MoreExecutors +import com.google.common.util.concurrent.SettableFuture +import core.Contract +import core.Party +import core.crypto.SecureHash +import core.messaging.MessagingService +import core.messaging.StateMachineManager +import core.messaging.runOnNextMessage +import core.messaging.send +import core.node.NodeInfo +import core.node.services.* +import core.random63BitValue +import core.serialization.deserialize +import core.serialization.serialize +import core.utilities.AddOrRemove +import java.security.SignatureException +import java.util.* +import javax.annotation.concurrent.ThreadSafe + +/** + * Extremely simple in-memory cache of the network map. + */ +@ThreadSafe +open class InMemoryNetworkMapCache() : NetworkMapCache { + override val networkMapNodes: List + get() = get(NetworkMapService.Type) + override val regulators: List + get() = get(RegulatorService.Type) + override val timestampingNodes: List + get() = get(TimestamperService.Type) + override val ratesOracleNodes: List + get() = get(NodeInterestRates.Type) + override val partyNodes: List + get() = registeredNodes.map { it.value } + + private var registeredForPush = false + protected var registeredNodes = Collections.synchronizedMap(HashMap()) + + override fun get() = registeredNodes.map { it.value } + override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value } + override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull() + + override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean, + ifChangedSinceVer: Int?): ListenableFuture { + if (subscribe && !registeredForPush) { + // Add handler to the network, for updates received from the remote network map service. + net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r -> + try { + val req = message.data.deserialize() + val hash = SecureHash.sha256(req.wireReg.serialize().bits) + val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX, + NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits) + net.send(ackMessage, req.replyTo) + processUpdatePush(req) + } catch(e: NodeMapError) { + NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}") + } catch(e: Exception) { + NetworkMapCache.logger.error("Exception processing update from network map service", e) + } + } + registeredForPush = true + } + + // Fetch the network map and register for updates at the same time + val sessionID = random63BitValue() + val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID) + + // Add a message handler for the response, and prepare a future to put the data into. + // Note that the message handler will run on the network thread (not this one). + val future = SettableFuture.create() + net.runOnNextMessage("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message -> + val resp = message.data.deserialize() + // We may not receive any nodes back, if the map hasn't changed since the version specified + resp.nodes?.forEach { processRegistration(it) } + future.set(Unit) + } + net.send("${NetworkMapService.FETCH_PROTOCOL_TOPIC}.0", req, service.address) + + return future + } + + override fun addNode(node: NodeInfo) { + registeredNodes[node.identity] = node + } + + override fun removeNode(node: NodeInfo) { + registeredNodes.remove(node.identity) + } + + /** + * Unsubscribes from updates from the given map service. + * + * @param service the network map service to listen to updates from. + */ + override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture { + // Fetch the network map and register for updates at the same time + val sessionID = random63BitValue() + val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID) + + // Add a message handler for the response, and prepare a future to put the data into. + // Note that the message handler will run on the network thread (not this one). + val future = SettableFuture.create() + net.runOnNextMessage("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.$sessionID", MoreExecutors.directExecutor()) { message -> + val resp = message.data.deserialize() + if (resp.confirmed) { + future.set(Unit) + } else { + future.setException(NetworkCacheError.DeregistrationFailed()) + } + } + net.send("${NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC}.0", req, service.address) + + return future + } + + fun processUpdatePush(req: NetworkMapService.Update) { + val reg: NodeRegistration + try { + reg = req.wireReg.verified() + } catch(e: SignatureException) { + throw NodeMapError.InvalidSignature() + } + processRegistration(reg) + } + + private fun processRegistration(reg: NodeRegistration) { + // TODO: Implement filtering by sequence number, so we only accept changes that are + // more recent than the latest change we've processed. + when (reg.type) { + AddOrRemove.ADD -> addNode(reg.node) + AddOrRemove.REMOVE -> removeNode(reg.node) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/core/node/subsystems/NetworkMapCache.kt b/src/main/kotlin/core/node/subsystems/NetworkMapCache.kt index 26fa26ec7e..3eb03d750a 100644 --- a/src/main/kotlin/core/node/subsystems/NetworkMapCache.kt +++ b/src/main/kotlin/core/node/subsystems/NetworkMapCache.kt @@ -1,27 +1,13 @@ package core.node.subsystems import com.google.common.util.concurrent.ListenableFuture -import com.google.common.util.concurrent.MoreExecutors -import com.google.common.util.concurrent.SettableFuture import core.Contract import core.Party -import core.crypto.SecureHash import core.messaging.MessagingService import core.messaging.StateMachineManager -import core.messaging.runOnNextMessage -import core.messaging.send import core.node.NodeInfo -import core.node.services.* -import core.node.services.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC -import core.node.services.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC -import core.random63BitValue -import core.serialization.deserialize -import core.serialization.serialize -import core.utilities.AddOrRemove +import core.node.services.ServiceType import org.slf4j.LoggerFactory -import java.security.SignatureException -import java.util.* -import javax.annotation.concurrent.ThreadSafe /** * A network map contains lists of nodes on the network along with information about their identity keys, services @@ -104,122 +90,6 @@ interface NetworkMapCache { fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture } -/** - * Extremely simple in-memory cache of the network map. - */ -@ThreadSafe -open class InMemoryNetworkMapCache() : NetworkMapCache { - override val networkMapNodes: List - get() = get(NetworkMapService.Type) - override val regulators: List - get() = get(RegulatorService.Type) - override val timestampingNodes: List - get() = get(TimestamperService.Type) - override val ratesOracleNodes: List - get() = get(NodeInterestRates.Type) - override val partyNodes: List - get() = registeredNodes.map { it.value } - - private var registeredForPush = false - protected var registeredNodes = Collections.synchronizedMap(HashMap()) - - override fun get() = registeredNodes.map { it.value } - override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value } - override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull() - - override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean, - ifChangedSinceVer: Int?): ListenableFuture { - if (subscribe && !registeredForPush) { - // Add handler to the network, for updates received from the remote network map service. - net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r -> - try { - val req = message.data.deserialize() - val hash = SecureHash.sha256(req.wireReg.serialize().bits) - val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX, - NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits) - net.send(ackMessage, req.replyTo) - processUpdatePush(req) - } catch(e: NodeMapError) { - NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}") - } catch(e: Exception) { - NetworkMapCache.logger.error("Exception processing update from network map service", e) - } - } - registeredForPush = true - } - - // Fetch the network map and register for updates at the same time - val sessionID = random63BitValue() - val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID) - - // Add a message handler for the response, and prepare a future to put the data into. - // Note that the message handler will run on the network thread (not this one). - val future = SettableFuture.create() - net.runOnNextMessage("$FETCH_PROTOCOL_TOPIC.$sessionID", MoreExecutors.directExecutor()) { message -> - val resp = message.data.deserialize() - // We may not receive any nodes back, if the map hasn't changed since the version specified - resp.nodes?.forEach { processRegistration(it) } - future.set(Unit) - } - net.send("$FETCH_PROTOCOL_TOPIC.0", req, service.address) - - return future - } - - override fun addNode(node: NodeInfo) { - registeredNodes[node.identity] = node - } - - override fun removeNode(node: NodeInfo) { - registeredNodes.remove(node.identity) - } - - /** - * Unsubscribes from updates from the given map service. - * - * @param service the network map service to listen to updates from. - */ - override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture { - // Fetch the network map and register for updates at the same time - val sessionID = random63BitValue() - val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID) - - // Add a message handler for the response, and prepare a future to put the data into. - // Note that the message handler will run on the network thread (not this one). - val future = SettableFuture.create() - net.runOnNextMessage("$SUBSCRIPTION_PROTOCOL_TOPIC.$sessionID", MoreExecutors.directExecutor()) { message -> - val resp = message.data.deserialize() - if (resp.confirmed) { - future.set(Unit) - } else { - future.setException(NetworkCacheError.DeregistrationFailed()) - } - } - net.send("$SUBSCRIPTION_PROTOCOL_TOPIC.0", req, service.address) - - return future - } - - fun processUpdatePush(req: NetworkMapService.Update) { - val reg: NodeRegistration - try { - reg = req.wireReg.verified() - } catch(e: SignatureException) { - throw NodeMapError.InvalidSignature() - } - processRegistration(reg) - } - - private fun processRegistration(reg: NodeRegistration) { - // TODO: Implement filtering by sequence number, so we only accept changes that are - // more recent than the latest change we've processed. - when (reg.type) { - AddOrRemove.ADD -> addNode(reg.node) - AddOrRemove.REMOVE -> removeNode(reg.node) - } - } -} - sealed class NetworkCacheError : Exception() { /** Indicates a failure to deregister, because of a rejected request from the remote node */ class DeregistrationFailed : NetworkCacheError() diff --git a/src/main/kotlin/core/node/subsystems/NodeWalletService.kt b/src/main/kotlin/core/node/subsystems/NodeWalletService.kt index 6f8bde6e84..b12222d8ab 100644 --- a/src/main/kotlin/core/node/subsystems/NodeWalletService.kt +++ b/src/main/kotlin/core/node/subsystems/NodeWalletService.kt @@ -5,8 +5,6 @@ import contracts.Cash import core.* import core.crypto.SecureHash import core.node.ServiceHub -import core.node.subsystems.Wallet -import core.node.subsystems.WalletService import core.utilities.loggerFor import core.utilities.trace import java.security.PublicKey @@ -26,7 +24,7 @@ class NodeWalletService(private val services: ServiceHub) : WalletService { // inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference // to wallet somewhere. private class InnerState { - var wallet: Wallet = Wallet(emptyList>()) + var wallet: Wallet = WalletImpl(emptyList>()) } private val mutex = ThreadBox(InnerState()) @@ -105,7 +103,7 @@ class NodeWalletService(private val services: ServiceHub) : WalletService { "Applied tx ${tx.id.prefixChars()} to the wallet: consumed ${consumed.size} states and added ${newStates.size}" } - return Wallet(newStates) + return WalletImpl(newStates) } private class BalanceMetric : Gauge { diff --git a/src/main/kotlin/core/node/subsystems/Services.kt b/src/main/kotlin/core/node/subsystems/Services.kt index 9992f2982b..832c7e9f12 100644 --- a/src/main/kotlin/core/node/subsystems/Services.kt +++ b/src/main/kotlin/core/node/subsystems/Services.kt @@ -1,7 +1,6 @@ package core.node.subsystems import com.codahale.metrics.MetricRegistry -import contracts.Cash import core.* import core.crypto.SecureHash import core.node.services.AttachmentStorage @@ -26,8 +25,12 @@ val TOPIC_DEFAULT_POSTFIX = ".0" * because we own them. This class represents an immutable, stable state of a wallet: it is guaranteed not to * change out from underneath you, even though the canonical currently-best-known wallet may change as we learn * about new transactions from our peers and generate new transactions that consume states ourselves. + * + * This absract class has no references to Cash contracts. */ -data class Wallet(val states: List>) { +abstract class Wallet { + abstract val states: List> + @Suppress("UNCHECKED_CAST") inline fun statesOfType() = states.filter { it.state is T } as List> @@ -35,13 +38,7 @@ data class Wallet(val states: List>) { * Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for * which we have no cash evaluate to null (not present in map), not 0. */ - val cashBalances: Map get() = states. - // Select the states we own which are cash, ignore the rest, take the amounts. - mapNotNull { (it.state as? Cash.State)?.amount }. - // Turn into a Map> like { GBP -> (£100, £500, etc), USD -> ($2000, $50) } - groupBy { it.currency }. - // Collapse to Map by summing all the amounts of the same currency together. - mapValues { it.value.sumOrThrow() } + abstract val cashBalances: Map } /** diff --git a/src/main/kotlin/core/node/subsystems/WalletImpl.kt b/src/main/kotlin/core/node/subsystems/WalletImpl.kt new file mode 100644 index 0000000000..4400e59e8e --- /dev/null +++ b/src/main/kotlin/core/node/subsystems/WalletImpl.kt @@ -0,0 +1,31 @@ +package core.node.subsystems + +import contracts.Cash +import core.Amount +import core.ContractState +import core.StateAndRef +import core.sumOrThrow +import java.util.* + +/** + * A wallet (name may be temporary) wraps a set of states that are useful for us to keep track of, for instance, + * because we own them. This class represents an immutable, stable state of a wallet: it is guaranteed not to + * change out from underneath you, even though the canonical currently-best-known wallet may change as we learn + * about new transactions from our peers and generate new transactions that consume states ourselves. + * + * This concrete implementation references Cash contracts. + */ +class WalletImpl(override val states: List>) : Wallet() { + + /** + * Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for + * which we have no cash evaluate to null (not present in map), not 0. + */ + override val cashBalances: Map get() = states. + // Select the states we own which are cash, ignore the rest, take the amounts. + mapNotNull { (it.state as? Cash.State)?.amount }. + // Turn into a Map> like { GBP -> (£100, £500, etc), USD -> ($2000, $50) } + groupBy { it.currency }. + // Collapse to Map by summing all the amounts of the same currency together. + mapValues { it.value.sumOrThrow() } +} \ No newline at end of file diff --git a/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt b/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt index f7376a358c..8e0a2e2860 100644 --- a/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt +++ b/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt @@ -13,6 +13,7 @@ import core.node.storage.CheckpointStorage import core.node.subsystems.NodeWalletService import core.node.subsystems.StorageServiceImpl import core.node.subsystems.Wallet +import core.node.subsystems.WalletImpl import core.testing.InMemoryMessagingNetwork import core.testing.MockNetwork import core.testutils.* @@ -394,7 +395,7 @@ class TwoPartyTradeProtocolTests { arg(MEGA_CORP_PUBKEY) { Cash.Commands.Move() } } - val wallet = Wallet(listOf>(lookup("bob cash 1"), lookup("bob cash 2"))) + val wallet = WalletImpl(listOf>(lookup("bob cash 1"), lookup("bob cash 2"))) return Pair(wallet, listOf(eb1, bc1, bc2)) } @@ -410,7 +411,7 @@ class TwoPartyTradeProtocolTests { attachment(attachmentID) } - val wallet = Wallet(listOf>(lookup("alice's paper"))) + val wallet = WalletImpl(listOf>(lookup("alice's paper"))) return Pair(wallet, listOf(ap)) } }