From c8130581a98bcf0a893bfab6510375bbc5d7d287 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 18 May 2016 10:15:03 +0100 Subject: [PATCH] Push internal subsystems into node --- .../kotlin/protocols/TwoPartyTradeProtocol.kt | 16 ------- .../kotlin/core/node/subsystems/Services.kt | 4 -- .../core/messaging/StateMachineManager.kt | 4 +- .../src/main/kotlin/core/node/AbstractNode.kt | 15 +++--- .../core/node/storage/CheckpointStorage.kt | 0 .../node/subsystems/StorageServiceImpl.kt | 1 - node/src/test/kotlin/core/MockServices.kt | 2 +- .../messaging/TwoPartyTradeProtocolTests.kt | 48 +++++++++++++------ 8 files changed, 45 insertions(+), 45 deletions(-) rename {node => contracts}/src/main/kotlin/protocols/TwoPartyTradeProtocol.kt (93%) rename {core => node}/src/main/kotlin/core/node/storage/CheckpointStorage.kt (100%) diff --git a/node/src/main/kotlin/protocols/TwoPartyTradeProtocol.kt b/contracts/src/main/kotlin/protocols/TwoPartyTradeProtocol.kt similarity index 93% rename from node/src/main/kotlin/protocols/TwoPartyTradeProtocol.kt rename to contracts/src/main/kotlin/protocols/TwoPartyTradeProtocol.kt index bb123c82ce..6da66c4ab9 100644 --- a/node/src/main/kotlin/protocols/TwoPartyTradeProtocol.kt +++ b/contracts/src/main/kotlin/protocols/TwoPartyTradeProtocol.kt @@ -1,7 +1,6 @@ package protocols import co.paralleluniverse.fibers.Suspendable -import com.google.common.util.concurrent.ListenableFuture import contracts.Cash import contracts.sumCashBy import core.contracts.* @@ -9,7 +8,6 @@ import core.crypto.DigitalSignature import core.crypto.Party import core.crypto.signWithECDSA import core.messaging.SingleMessageRecipient -import core.messaging.StateMachineManager import core.node.NodeInfo import core.protocols.ProtocolLogic import core.random63BitValue @@ -46,20 +44,6 @@ import java.security.SignatureException object TwoPartyTradeProtocol { val TRADE_TOPIC = "platform.trade" - fun runSeller(smm: StateMachineManager, notary: NodeInfo, - otherSide: SingleMessageRecipient, assetToSell: StateAndRef, price: Amount, - myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture { - val seller = Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID) - return smm.add("${TRADE_TOPIC}.seller", seller) - } - - fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo, - otherSide: SingleMessageRecipient, acceptablePrice: Amount, typeToBuy: Class, - sessionID: Long): ListenableFuture { - val buyer = Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID) - return smm.add("$TRADE_TOPIC.buyer", buyer) - } - class UnacceptablePriceException(val givenPrice: Amount) : Exception() class AssetMismatchException(val expectedTypeName: String, val typeName: String) : Exception() { override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName" diff --git a/core/src/main/kotlin/core/node/subsystems/Services.kt b/core/src/main/kotlin/core/node/subsystems/Services.kt index f9978f71fa..5de89fdb5e 100644 --- a/core/src/main/kotlin/core/node/subsystems/Services.kt +++ b/core/src/main/kotlin/core/node/subsystems/Services.kt @@ -1,12 +1,10 @@ package core.node.subsystems import com.codahale.metrics.MetricRegistry -import core.* import core.contracts.* import core.crypto.Party import core.crypto.SecureHash import core.node.services.AttachmentStorage -import core.node.storage.CheckpointStorage import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey @@ -128,8 +126,6 @@ interface StorageService { */ val validatedTransactions: MutableMap - val checkpointStorage: CheckpointStorage - /** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */ val attachments: AttachmentStorage diff --git a/node/src/main/kotlin/core/messaging/StateMachineManager.kt b/node/src/main/kotlin/core/messaging/StateMachineManager.kt index 34fdb8cdd3..0efe64fb0a 100644 --- a/node/src/main/kotlin/core/messaging/StateMachineManager.kt +++ b/node/src/main/kotlin/core/messaging/StateMachineManager.kt @@ -9,6 +9,7 @@ import com.google.common.base.Throwables import com.google.common.util.concurrent.ListenableFuture import core.node.ServiceHub import core.node.storage.Checkpoint +import core.node.storage.CheckpointStorage import core.protocols.ProtocolLogic import core.protocols.ProtocolStateMachine import core.protocols.ProtocolStateMachineImpl @@ -53,14 +54,13 @@ import javax.annotation.concurrent.ThreadSafe * TODO: Implement stub/skel classes that provide a basic RPC framework on top of this. */ @ThreadSafe -class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExecutor) { +class StateMachineManager(val serviceHub: ServiceHub, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) { inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor) val scheduler = FiberScheduler() // This map is backed by a database and will be used to store serialised state machines to disk, so we can resurrect // them across node restarts. - private val checkpointStorage = serviceHub.storageService.checkpointStorage // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines // property. private val stateMachines = synchronizedMap(HashMap, Checkpoint>()) diff --git a/node/src/main/kotlin/core/node/AbstractNode.kt b/node/src/main/kotlin/core/node/AbstractNode.kt index c7a4dc96c8..70da143f5e 100644 --- a/node/src/main/kotlin/core/node/AbstractNode.kt +++ b/node/src/main/kotlin/core/node/AbstractNode.kt @@ -78,6 +78,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, open fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity] lateinit var storage: StorageService + lateinit var checkpointStorage: CheckpointStorage lateinit var smm: StateMachineManager lateinit var wallet: WalletService lateinit var keyManagement: E2ETestKeyManagementService @@ -99,9 +100,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, require(!started) { "Node has already been started" } log.info("Node starting up ...") - storage = initialiseStorageService(dir) + val storageServices = initialiseStorageService(dir) + storage = storageServices.first + checkpointStorage = storageServices.second net = makeMessagingService() - smm = StateMachineManager(services, serverThread) + smm = StateMachineManager(services, checkpointStorage, serverThread) wallet = NodeWalletService(services) keyManagement = E2ETestKeyManagementService() makeInterestRatesOracleService() @@ -216,16 +219,16 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, protected abstract fun startMessagingService() - protected open fun initialiseStorageService(dir: Path): StorageService { + protected open fun initialiseStorageService(dir: Path): Pair { val attachments = makeAttachmentStorage(dir) val checkpointStorage = PerFileCheckpointStorage(dir.resolve("checkpoints")) _servicesThatAcceptUploads += attachments val (identity, keypair) = obtainKeyPair(dir) - return constructStorageService(attachments, checkpointStorage, keypair, identity) + return Pair(constructStorageService(attachments, keypair, identity),checkpointStorage) } - protected open fun constructStorageService(attachments: NodeAttachmentService, checkpointStorage: CheckpointStorage, keypair: KeyPair, identity: Party) = - StorageServiceImpl(attachments, checkpointStorage, keypair, identity) + protected open fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party) = + StorageServiceImpl(attachments, keypair, identity) private fun obtainKeyPair(dir: Path): Pair { // Load the private identity key, creating it if necessary. The identity key is a long term well known key that diff --git a/core/src/main/kotlin/core/node/storage/CheckpointStorage.kt b/node/src/main/kotlin/core/node/storage/CheckpointStorage.kt similarity index 100% rename from core/src/main/kotlin/core/node/storage/CheckpointStorage.kt rename to node/src/main/kotlin/core/node/storage/CheckpointStorage.kt diff --git a/node/src/main/kotlin/core/node/subsystems/StorageServiceImpl.kt b/node/src/main/kotlin/core/node/subsystems/StorageServiceImpl.kt index ec5d38fe70..2de161ef3d 100644 --- a/node/src/main/kotlin/core/node/subsystems/StorageServiceImpl.kt +++ b/node/src/main/kotlin/core/node/subsystems/StorageServiceImpl.kt @@ -11,7 +11,6 @@ import java.security.KeyPair import java.util.* open class StorageServiceImpl(override val attachments: AttachmentStorage, - override val checkpointStorage: CheckpointStorage, override val myLegalIdentityKey: KeyPair, override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public), // This parameter is for unit tests that want to observe operation details. diff --git a/node/src/test/kotlin/core/MockServices.kt b/node/src/test/kotlin/core/MockServices.kt index 6bd2f05d43..15ddb95d47 100644 --- a/node/src/test/kotlin/core/MockServices.kt +++ b/node/src/test/kotlin/core/MockServices.kt @@ -91,7 +91,7 @@ class MockCheckpointStorage : CheckpointStorage { @ThreadSafe -class MockStorageService : StorageServiceImpl(MockAttachmentStorage(), MockCheckpointStorage(), generateKeyPair()) +class MockStorageService : StorageServiceImpl(MockAttachmentStorage(), generateKeyPair()) class MockServices( customWallet: WalletService? = null, diff --git a/node/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt index 45253076d1..11670b7062 100644 --- a/node/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt @@ -1,21 +1,23 @@ package core.messaging +import com.google.common.util.concurrent.ListenableFuture import contracts.Cash import contracts.CommercialPaper -import core.* import core.contracts.* import core.crypto.Party import core.crypto.SecureHash +import core.days import core.node.NodeConfiguration import core.node.NodeInfo import core.node.ServiceHub import core.node.services.NodeAttachmentService import core.node.services.ServiceType -import core.node.storage.CheckpointStorage import core.node.subsystems.NodeWalletService import core.node.subsystems.StorageServiceImpl import core.node.subsystems.Wallet import core.node.subsystems.WalletImpl +import core.random63BitValue +import core.seconds import core.testing.InMemoryMessagingNetwork import core.testing.MockNetwork import core.testutils.* @@ -79,7 +81,7 @@ class TwoPartyTradeProtocolTests { val buyerSessionID = random63BitValue() - val aliceResult = TwoPartyTradeProtocol.runSeller( + val aliceResult = runSeller( aliceNode.smm, notaryNode.info, bobNode.net.myAddress, @@ -88,7 +90,7 @@ class TwoPartyTradeProtocolTests { ALICE_KEY, buyerSessionID ) - val bobResult = TwoPartyTradeProtocol.runBuyer( + val bobResult = runBuyer( bobNode.smm, notaryNode.info, aliceNode.net.myAddress, @@ -104,8 +106,8 @@ class TwoPartyTradeProtocolTests { aliceNode.stop() bobNode.stop() - assertThat(aliceNode.storage.checkpointStorage.checkpoints).isEmpty() - assertThat(bobNode.storage.checkpointStorage.checkpoints).isEmpty() + assertThat(aliceNode.checkpointStorage.checkpoints).isEmpty() + assertThat(bobNode.checkpointStorage.checkpoints).isEmpty() } } @@ -129,7 +131,7 @@ class TwoPartyTradeProtocolTests { val buyerSessionID = random63BitValue() - val aliceFuture = TwoPartyTradeProtocol.runSeller( + val aliceFuture = runSeller( aliceNode.smm, notaryNode.info, bobAddr, @@ -138,7 +140,7 @@ class TwoPartyTradeProtocolTests { ALICE_KEY, buyerSessionID ) - TwoPartyTradeProtocol.runBuyer( + runBuyer( bobNode.smm, notaryNode.info, aliceAddr, @@ -162,7 +164,7 @@ class TwoPartyTradeProtocolTests { pumpBob() // OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature. - assertThat(bobNode.storage.checkpointStorage.checkpoints).hasSize(1) + assertThat(bobNode.checkpointStorage.checkpoints).hasSize(1) // TODO: remove once validated transactions are persisted to disk val recordedTransactions = bobNode.storage.validatedTransactions @@ -208,15 +210,31 @@ class TwoPartyTradeProtocolTests { advertisedServices: Set, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id, keyPair) { // That constructs the storage service object in a customised way ... - override fun constructStorageService(attachments: NodeAttachmentService, checkpointStorage: CheckpointStorage, keypair: KeyPair, identity: Party): StorageServiceImpl { + override fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party): StorageServiceImpl { // To use RecordingMaps instead of ordinary HashMaps. - return StorageServiceImpl(attachments, checkpointStorage, keypair, identity, { tableName -> name }) + return StorageServiceImpl(attachments, keypair, identity, { tableName -> name }) } } } }, true, name, keyPair) } + + private fun runSeller(smm: StateMachineManager, notary: NodeInfo, + otherSide: SingleMessageRecipient, assetToSell: StateAndRef, price: Amount, + myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture { + val seller = TwoPartyTradeProtocol.Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID) + return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", seller) + } + + private fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo, + otherSide: SingleMessageRecipient, acceptablePrice: Amount, typeToBuy: Class, + sessionID: Long): ListenableFuture { + val buyer = TwoPartyTradeProtocol.Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID) + return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyer) + } + + @Test fun checkDependenciesOfSaleAssetAreResolved() { transactionGroupFor { @@ -243,7 +261,7 @@ class TwoPartyTradeProtocolTests { net.runNetwork() // Clear network map registration messages - TwoPartyTradeProtocol.runSeller( + runSeller( aliceNode.smm, notaryNode.info, bobNode.net.myAddress, @@ -252,7 +270,7 @@ class TwoPartyTradeProtocolTests { ALICE_KEY, buyerSessionID ) - TwoPartyTradeProtocol.runBuyer( + runBuyer( bobNode.smm, notaryNode.info, aliceNode.net.myAddress, @@ -356,7 +374,7 @@ class TwoPartyTradeProtocolTests { net.runNetwork() // Clear network map registration messages - val aliceResult = TwoPartyTradeProtocol.runSeller( + val aliceResult = runSeller( aliceNode.smm, notaryNode.info, bobAddr, @@ -365,7 +383,7 @@ class TwoPartyTradeProtocolTests { ALICE_KEY, buyerSessionID ) - val bobResult = TwoPartyTradeProtocol.runBuyer( + val bobResult = runBuyer( bobNode.smm, notaryNode.info, aliceAddr,