Push internal subsystems into node

This commit is contained in:
Matthew Nesbit
2016-05-18 10:15:03 +01:00
parent 5e70646bd2
commit c8130581a9
8 changed files with 45 additions and 45 deletions

View File

@ -9,6 +9,7 @@ import com.google.common.base.Throwables
import com.google.common.util.concurrent.ListenableFuture
import core.node.ServiceHub
import core.node.storage.Checkpoint
import core.node.storage.CheckpointStorage
import core.protocols.ProtocolLogic
import core.protocols.ProtocolStateMachine
import core.protocols.ProtocolStateMachineImpl
@ -53,14 +54,13 @@ import javax.annotation.concurrent.ThreadSafe
* TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
*/
@ThreadSafe
class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExecutor) {
class StateMachineManager(val serviceHub: ServiceHub, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) {
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
val scheduler = FiberScheduler()
// This map is backed by a database and will be used to store serialised state machines to disk, so we can resurrect
// them across node restarts.
private val checkpointStorage = serviceHub.storageService.checkpointStorage
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property.
private val stateMachines = synchronizedMap(HashMap<ProtocolStateMachineImpl<*>, Checkpoint>())

View File

@ -78,6 +78,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
open fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity]
lateinit var storage: StorageService
lateinit var checkpointStorage: CheckpointStorage
lateinit var smm: StateMachineManager
lateinit var wallet: WalletService
lateinit var keyManagement: E2ETestKeyManagementService
@ -99,9 +100,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
require(!started) { "Node has already been started" }
log.info("Node starting up ...")
storage = initialiseStorageService(dir)
val storageServices = initialiseStorageService(dir)
storage = storageServices.first
checkpointStorage = storageServices.second
net = makeMessagingService()
smm = StateMachineManager(services, serverThread)
smm = StateMachineManager(services, checkpointStorage, serverThread)
wallet = NodeWalletService(services)
keyManagement = E2ETestKeyManagementService()
makeInterestRatesOracleService()
@ -216,16 +219,16 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
protected abstract fun startMessagingService()
protected open fun initialiseStorageService(dir: Path): StorageService {
protected open fun initialiseStorageService(dir: Path): Pair<StorageService,CheckpointStorage> {
val attachments = makeAttachmentStorage(dir)
val checkpointStorage = PerFileCheckpointStorage(dir.resolve("checkpoints"))
_servicesThatAcceptUploads += attachments
val (identity, keypair) = obtainKeyPair(dir)
return constructStorageService(attachments, checkpointStorage, keypair, identity)
return Pair(constructStorageService(attachments, keypair, identity),checkpointStorage)
}
protected open fun constructStorageService(attachments: NodeAttachmentService, checkpointStorage: CheckpointStorage, keypair: KeyPair, identity: Party) =
StorageServiceImpl(attachments, checkpointStorage, keypair, identity)
protected open fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party) =
StorageServiceImpl(attachments, keypair, identity)
private fun obtainKeyPair(dir: Path): Pair<Party, KeyPair> {
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that

View File

@ -0,0 +1,43 @@
package core.node.storage
import core.crypto.sha256
import core.protocols.ProtocolStateMachine
import core.serialization.SerializedBytes
/**
* Thread-safe storage of fiber checkpoints.
*
* TODO: Make internal to node again once split [ServiceHub] into a public (to contracts etc) and private (to node) view
*/
interface CheckpointStorage {
/**
* Add a new checkpoint to the store.
*/
fun addCheckpoint(checkpoint: Checkpoint)
/**
* Remove existing checkpoint from the store. It is an error to attempt to remove a checkpoint which doesn't exist
* in the store. Doing so will throw an [IllegalArgumentException].
*/
fun removeCheckpoint(checkpoint: Checkpoint)
/**
* Returns a snapshot of all the checkpoints in the store.
* This may return more checkpoints than were added to this instance of the store; for example if the store persists
* checkpoints to disk.
*/
val checkpoints: Iterable<Checkpoint>
}
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
data class Checkpoint(
val serialisedFiber: SerializedBytes<out ProtocolStateMachine<*>>,
val awaitingTopic: String,
val awaitingObjectOfType: String // java class name
) {
override fun toString(): String {
return "Checkpoint(#serialisedFiber=${serialisedFiber.sha256()}, awaitingTopic=$awaitingTopic, awaitingObjectOfType=$awaitingObjectOfType)"
}
}

View File

@ -11,7 +11,6 @@ import java.security.KeyPair
import java.util.*
open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val checkpointStorage: CheckpointStorage,
override val myLegalIdentityKey: KeyPair,
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public),
// This parameter is for unit tests that want to observe operation details.

View File

@ -1,307 +0,0 @@
package protocols
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import contracts.Cash
import contracts.sumCashBy
import core.contracts.*
import core.crypto.DigitalSignature
import core.crypto.Party
import core.crypto.signWithECDSA
import core.messaging.SingleMessageRecipient
import core.messaging.StateMachineManager
import core.node.NodeInfo
import core.protocols.ProtocolLogic
import core.random63BitValue
import core.seconds
import core.utilities.ProgressTracker
import core.utilities.trace
import java.security.KeyPair
import java.security.PublicKey
import java.security.SignatureException
/**
* This asset trading protocol implements a "delivery vs payment" type swap. It has two parties (B and S for buyer
* and seller) and the following steps:
*
* 1. S sends the [StateAndRef] pointing to what they want to sell to B, along with info about the price they require
* B to pay. For example this has probably been agreed on an exchange.
* 2. B sends to S a [SignedTransaction] that includes the state as input, B's cash as input, the state with the new
* owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because
* it lacks a signature from S authorising movement of the asset.
* 3. S signs it and hands the now finalised SignedWireTransaction back to B.
*
* Assuming no malicious termination, they both end the protocol being in posession of a valid, signed transaction
* that represents an atomic asset swap.
*
* Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine.
*
* To initiate the protocol, use either the [runBuyer] or [runSeller] methods, depending on which side of the trade
* your node is taking. These methods return a future which will complete once the trade is over and a fully signed
* transaction is available: you can either block your thread waiting for the protocol to complete by using
* [ListenableFuture.get] or more usefully, register a callback that will be invoked when the time comes.
*
* To see an example of how to use this class, look at the unit tests.
*/
object TwoPartyTradeProtocol {
val TRADE_TOPIC = "platform.trade"
fun runSeller(smm: StateMachineManager, notary: NodeInfo,
otherSide: SingleMessageRecipient, assetToSell: StateAndRef<OwnableState>, price: Amount,
myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture<SignedTransaction> {
val seller = Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID)
return smm.add("${TRADE_TOPIC}.seller", seller)
}
fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo,
otherSide: SingleMessageRecipient, acceptablePrice: Amount, typeToBuy: Class<out OwnableState>,
sessionID: Long): ListenableFuture<SignedTransaction> {
val buyer = Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID)
return smm.add("$TRADE_TOPIC.buyer", buyer)
}
class UnacceptablePriceException(val givenPrice: Amount) : Exception()
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : Exception() {
override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName"
}
// This object is serialised to the network and is the first protocol message the seller sends to the buyer.
class SellerTradeInfo(
val assetForSale: StateAndRef<OwnableState>,
val price: Amount,
val sellerOwnerKey: PublicKey,
val sessionID: Long
)
class SignaturesFromSeller(val sellerSig: DigitalSignature.WithKey,
val notarySig: DigitalSignature.LegallyIdentifiable)
open class Seller(val otherSide: SingleMessageRecipient,
val notaryNode: NodeInfo,
val assetToSell: StateAndRef<OwnableState>,
val price: Amount,
val myKeyPair: KeyPair,
val buyerSessionID: Long,
override val progressTracker: ProgressTracker = Seller.tracker()) : ProtocolLogic<SignedTransaction>() {
companion object {
object AWAITING_PROPOSAL : ProgressTracker.Step("Awaiting transaction proposal")
object VERIFYING : ProgressTracker.Step("Verifying transaction proposal")
object SIGNING : ProgressTracker.Step("Signing transaction")
object NOTARY : ProgressTracker.Step("Getting notary signature")
object SENDING_SIGS : ProgressTracker.Step("Sending transaction signatures to buyer")
fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, NOTARY, SENDING_SIGS)
}
@Suspendable
override fun call(): SignedTransaction {
val partialTX: SignedTransaction = receiveAndCheckProposedTransaction()
// These two steps could be done in parallel, in theory. Our framework doesn't support that yet though.
val ourSignature = signWithOurKey(partialTX)
val notarySignature = getNotarySignature(partialTX)
return sendSignatures(partialTX, ourSignature, notarySignature)
}
@Suspendable
private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.LegallyIdentifiable {
progressTracker.currentStep = NOTARY
return subProtocol(NotaryProtocol(stx.tx))
}
@Suspendable
private fun receiveAndCheckProposedTransaction(): SignedTransaction {
progressTracker.currentStep = AWAITING_PROPOSAL
val sessionID = random63BitValue()
// Make the first message we'll send to kick off the protocol.
val hello = SellerTradeInfo(assetToSell, price, myKeyPair.public, sessionID)
val maybeSTX = sendAndReceive<SignedTransaction>(TRADE_TOPIC, otherSide, buyerSessionID, sessionID, hello)
progressTracker.currentStep = VERIFYING
maybeSTX.validate {
progressTracker.nextStep()
// Check that the tx proposed by the buyer is valid.
val missingSigs = it.verify(throwIfSignaturesAreMissing = false)
if (missingSigs != setOf(myKeyPair.public, notaryNode.identity.owningKey))
throw SignatureException("The set of missing signatures is not as expected: $missingSigs")
val wtx: WireTransaction = it.tx
logger.trace { "Received partially signed transaction: ${it.id}" }
checkDependencies(it)
// This verifies that the transaction is contract-valid, even though it is missing signatures.
serviceHub.verifyTransaction(wtx.toLedgerTransaction(serviceHub.identityService, serviceHub.storageService.attachments))
if (wtx.outputs.sumCashBy(myKeyPair.public) != price)
throw IllegalArgumentException("Transaction is not sending us the right amount of cash")
// There are all sorts of funny games a malicious secondary might play here, we should fix them:
//
// - This tx may attempt to send some assets we aren't intending to sell to the secondary, if
// we're reusing keys! So don't reuse keys!
// - This tx may include output states that impose odd conditions on the movement of the cash,
// once we implement state pairing.
//
// but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to
// express protocol state machines on top of the messaging layer.
return it
}
}
@Suspendable
private fun checkDependencies(stx: SignedTransaction) {
// Download and check all the transactions that this transaction depends on, but do not check this
// transaction itself.
val dependencyTxIDs = stx.tx.inputs.map { it.txhash }.toSet()
subProtocol(ResolveTransactionsProtocol(dependencyTxIDs, otherSide))
}
open fun signWithOurKey(partialTX: SignedTransaction): DigitalSignature.WithKey {
progressTracker.currentStep = SIGNING
return myKeyPair.signWithECDSA(partialTX.txBits)
}
@Suspendable
private fun sendSignatures(partialTX: SignedTransaction, ourSignature: DigitalSignature.WithKey,
notarySignature: DigitalSignature.LegallyIdentifiable): SignedTransaction {
progressTracker.currentStep = SENDING_SIGS
val fullySigned = partialTX + ourSignature + notarySignature
logger.trace { "Built finished transaction, sending back to secondary!" }
send(TRADE_TOPIC, otherSide, buyerSessionID, SignaturesFromSeller(ourSignature, notarySignature))
return fullySigned
}
}
open class Buyer(val otherSide: SingleMessageRecipient,
val notary: Party,
val acceptablePrice: Amount,
val typeToBuy: Class<out OwnableState>,
val sessionID: Long) : ProtocolLogic<SignedTransaction>() {
object RECEIVING : ProgressTracker.Step("Waiting for seller trading info")
object VERIFYING : ProgressTracker.Step("Verifying seller assets")
object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal")
object SWAPPING_SIGNATURES : ProgressTracker.Step("Swapping signatures with the seller")
override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES)
@Suspendable
override fun call(): SignedTransaction {
val tradeRequest = receiveAndValidateTradeRequest()
progressTracker.currentStep = SIGNING
val (ptx, cashSigningPubKeys) = assembleSharedTX(tradeRequest)
val stx = signWithOurKeys(cashSigningPubKeys, ptx)
// exitProcess(0)
val signatures = swapSignaturesWithSeller(stx, tradeRequest.sessionID)
logger.trace { "Got signatures from seller, verifying ... " }
// TODO: figure out a way to do Notary verification along with other command signatures in SignedTransaction.verify()
verifyCorrectNotary(stx.tx, signatures.notarySig)
val fullySigned = stx + signatures.sellerSig + signatures.notarySig
fullySigned.verify()
logger.trace { "Signatures received are valid. Trade complete! :-)" }
return fullySigned
}
@Suspendable
private fun receiveAndValidateTradeRequest(): SellerTradeInfo {
progressTracker.currentStep = RECEIVING
// Wait for a trade request to come in on our pre-provided session ID.
val maybeTradeRequest = receive<SellerTradeInfo>(TRADE_TOPIC, sessionID)
progressTracker.currentStep = VERIFYING
maybeTradeRequest.validate {
// What is the seller trying to sell us?
val asset = it.assetForSale.state
val assetTypeName = asset.javaClass.name
logger.trace { "Got trade request for a $assetTypeName: ${it.assetForSale}" }
// Check the start message for acceptability.
check(it.sessionID > 0)
if (it.price > acceptablePrice)
throw UnacceptablePriceException(it.price)
if (!typeToBuy.isInstance(asset))
throw AssetMismatchException(typeToBuy.name, assetTypeName)
// Check the transaction that contains the state which is being resolved.
// We only have a hash here, so if we don't know it already, we have to ask for it.
subProtocol(ResolveTransactionsProtocol(setOf(it.assetForSale.ref.txhash), otherSide))
return it
}
}
@Suspendable
private fun swapSignaturesWithSeller(stx: SignedTransaction, theirSessionID: Long): SignaturesFromSeller {
progressTracker.currentStep = SWAPPING_SIGNATURES
logger.trace { "Sending partially signed transaction to seller" }
// TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx.
return sendAndReceive<SignaturesFromSeller>(TRADE_TOPIC, otherSide, theirSessionID, sessionID, stx).validate { it }
}
private fun signWithOurKeys(cashSigningPubKeys: List<PublicKey>, ptx: TransactionBuilder): SignedTransaction {
// Now sign the transaction with whatever keys we need to move the cash.
for (k in cashSigningPubKeys) {
val priv = serviceHub.keyManagementService.toPrivate(k)
ptx.signWith(KeyPair(k, priv))
}
return ptx.toSignedTransaction(checkSufficientSignatures = false)
}
private fun verifyCorrectNotary(wtx: WireTransaction, sig: DigitalSignature.LegallyIdentifiable) {
val notary = serviceHub.loadState(wtx.inputs.first()).notary
check(sig.signer == notary) { "Transaction not signed by the required Notary" }
}
private fun assembleSharedTX(tradeRequest: SellerTradeInfo): Pair<TransactionBuilder, List<PublicKey>> {
val ptx = TransactionBuilder()
// Add input and output states for the movement of cash, by using the Cash contract to generate the states.
val wallet = serviceHub.walletService.currentWallet
val cashStates = wallet.statesOfType<Cash.State>()
val cashSigningPubKeys = Cash().generateSpend(ptx, tradeRequest.price, tradeRequest.sellerOwnerKey, cashStates)
// Add inputs/outputs/a command for the movement of the asset.
ptx.addInputState(tradeRequest.assetForSale.ref)
// Just pick some new public key for now. This won't be linked with our identity in any way, which is what
// we want for privacy reasons: the key is here ONLY to manage and control ownership, it is not intended to
// reveal who the owner actually is. The key management service is expected to derive a unique key from some
// initial seed in order to provide privacy protection.
val freshKey = serviceHub.keyManagementService.freshKey()
val (command, state) = tradeRequest.assetForSale.state.withNewOwner(freshKey.public)
ptx.addOutputState(state)
ptx.addCommand(command, tradeRequest.assetForSale.state.owner)
// And add a request for timestamping: it may be that none of the contracts need this! But it can't hurt
// to have one.
val currentTime = serviceHub.clock.instant()
ptx.setTime(currentTime, notary, 30.seconds)
return Pair(ptx, cashSigningPubKeys)
}
}
}

View File

@ -91,7 +91,7 @@ class MockCheckpointStorage : CheckpointStorage {
@ThreadSafe
class MockStorageService : StorageServiceImpl(MockAttachmentStorage(), MockCheckpointStorage(), generateKeyPair())
class MockStorageService : StorageServiceImpl(MockAttachmentStorage(), generateKeyPair())
class MockServices(
customWallet: WalletService? = null,

View File

@ -1,21 +1,23 @@
package core.messaging
import com.google.common.util.concurrent.ListenableFuture
import contracts.Cash
import contracts.CommercialPaper
import core.*
import core.contracts.*
import core.crypto.Party
import core.crypto.SecureHash
import core.days
import core.node.NodeConfiguration
import core.node.NodeInfo
import core.node.ServiceHub
import core.node.services.NodeAttachmentService
import core.node.services.ServiceType
import core.node.storage.CheckpointStorage
import core.node.subsystems.NodeWalletService
import core.node.subsystems.StorageServiceImpl
import core.node.subsystems.Wallet
import core.node.subsystems.WalletImpl
import core.random63BitValue
import core.seconds
import core.testing.InMemoryMessagingNetwork
import core.testing.MockNetwork
import core.testutils.*
@ -79,7 +81,7 @@ class TwoPartyTradeProtocolTests {
val buyerSessionID = random63BitValue()
val aliceResult = TwoPartyTradeProtocol.runSeller(
val aliceResult = runSeller(
aliceNode.smm,
notaryNode.info,
bobNode.net.myAddress,
@ -88,7 +90,7 @@ class TwoPartyTradeProtocolTests {
ALICE_KEY,
buyerSessionID
)
val bobResult = TwoPartyTradeProtocol.runBuyer(
val bobResult = runBuyer(
bobNode.smm,
notaryNode.info,
aliceNode.net.myAddress,
@ -104,8 +106,8 @@ class TwoPartyTradeProtocolTests {
aliceNode.stop()
bobNode.stop()
assertThat(aliceNode.storage.checkpointStorage.checkpoints).isEmpty()
assertThat(bobNode.storage.checkpointStorage.checkpoints).isEmpty()
assertThat(aliceNode.checkpointStorage.checkpoints).isEmpty()
assertThat(bobNode.checkpointStorage.checkpoints).isEmpty()
}
}
@ -129,7 +131,7 @@ class TwoPartyTradeProtocolTests {
val buyerSessionID = random63BitValue()
val aliceFuture = TwoPartyTradeProtocol.runSeller(
val aliceFuture = runSeller(
aliceNode.smm,
notaryNode.info,
bobAddr,
@ -138,7 +140,7 @@ class TwoPartyTradeProtocolTests {
ALICE_KEY,
buyerSessionID
)
TwoPartyTradeProtocol.runBuyer(
runBuyer(
bobNode.smm,
notaryNode.info,
aliceAddr,
@ -162,7 +164,7 @@ class TwoPartyTradeProtocolTests {
pumpBob()
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
assertThat(bobNode.storage.checkpointStorage.checkpoints).hasSize(1)
assertThat(bobNode.checkpointStorage.checkpoints).hasSize(1)
// TODO: remove once validated transactions are persisted to disk
val recordedTransactions = bobNode.storage.validatedTransactions
@ -208,15 +210,31 @@ class TwoPartyTradeProtocolTests {
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id, keyPair) {
// That constructs the storage service object in a customised way ...
override fun constructStorageService(attachments: NodeAttachmentService, checkpointStorage: CheckpointStorage, keypair: KeyPair, identity: Party): StorageServiceImpl {
override fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party): StorageServiceImpl {
// To use RecordingMaps instead of ordinary HashMaps.
return StorageServiceImpl(attachments, checkpointStorage, keypair, identity, { tableName -> name })
return StorageServiceImpl(attachments, keypair, identity, { tableName -> name })
}
}
}
}, true, name, keyPair)
}
private fun runSeller(smm: StateMachineManager, notary: NodeInfo,
otherSide: SingleMessageRecipient, assetToSell: StateAndRef<OwnableState>, price: Amount,
myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture<SignedTransaction> {
val seller = TwoPartyTradeProtocol.Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID)
return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", seller)
}
private fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo,
otherSide: SingleMessageRecipient, acceptablePrice: Amount, typeToBuy: Class<out OwnableState>,
sessionID: Long): ListenableFuture<SignedTransaction> {
val buyer = TwoPartyTradeProtocol.Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID)
return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyer)
}
@Test
fun checkDependenciesOfSaleAssetAreResolved() {
transactionGroupFor<ContractState> {
@ -243,7 +261,7 @@ class TwoPartyTradeProtocolTests {
net.runNetwork() // Clear network map registration messages
TwoPartyTradeProtocol.runSeller(
runSeller(
aliceNode.smm,
notaryNode.info,
bobNode.net.myAddress,
@ -252,7 +270,7 @@ class TwoPartyTradeProtocolTests {
ALICE_KEY,
buyerSessionID
)
TwoPartyTradeProtocol.runBuyer(
runBuyer(
bobNode.smm,
notaryNode.info,
aliceNode.net.myAddress,
@ -356,7 +374,7 @@ class TwoPartyTradeProtocolTests {
net.runNetwork() // Clear network map registration messages
val aliceResult = TwoPartyTradeProtocol.runSeller(
val aliceResult = runSeller(
aliceNode.smm,
notaryNode.info,
bobAddr,
@ -365,7 +383,7 @@ class TwoPartyTradeProtocolTests {
ALICE_KEY,
buyerSessionID
)
val bobResult = TwoPartyTradeProtocol.runBuyer(
val bobResult = runBuyer(
bobNode.smm,
notaryNode.info,
aliceAddr,