Merge branch 'master' of https://github.com/corda/corda into christians_checkpoint_checker_thread

This commit is contained in:
Christian Sailer
2017-10-27 14:14:09 +01:00
35 changed files with 417 additions and 136 deletions

View File

@ -16,10 +16,7 @@ import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.*
import net.corda.core.node.AppServiceHub
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.StateLoader
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
@ -803,9 +800,9 @@ abstract class AbstractNode(config: NodeConfiguration,
return flowFactories[initiatingFlowClass]
}
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
database.transaction {
super.recordTransactions(notifyVault, txs)
super.recordTransactions(statesToRecord, txs)
}
}

View File

@ -274,6 +274,7 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
/** @param rootPackageName only this package and subpackages may be extracted from [url], or null to allow all packages. */
private class RestrictedURL(val url: URL, rootPackageName: String?) {
val qualifiedNamePrefix = rootPackageName?.let { it + '.' } ?: ""
override fun toString() = url.toString()
}
private inner class RestrictedScanResult(private val scanResult: ScanResult, private val qualifiedNamePrefix: String) {

View File

@ -7,6 +7,7 @@ import net.corda.core.contracts.requireThat
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.ContractUpgradeUtils
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
@ -16,8 +17,7 @@ import net.corda.core.transactions.SignedTransaction
class FinalityHandler(private val sender: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val stx = subFlow(ReceiveTransactionFlow(sender))
serviceHub.recordTransactions(stx)
subFlow(ReceiveTransactionFlow(sender, true, StatesToRecord.ONLY_RELEVANT))
}
}

View File

@ -15,6 +15,7 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCacheBase
import net.corda.core.node.services.TransactionStorage
@ -93,7 +94,7 @@ interface ServiceHubInternal : ServiceHub {
val database: CordaPersistence
val configuration: NodeConfiguration
override val cordappProvider: CordappProviderInternal
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
require(txs.any()) { "No transactions passed in for recording" }
val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) }
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
@ -105,9 +106,43 @@ interface ServiceHubInternal : ServiceHub {
log.warn("Transactions recorded from outside of a state machine")
}
if (notifyVault) {
if (statesToRecord != StatesToRecord.NONE) {
val toNotify = recordedTransactions.map { if (it.isNotaryChangeTransaction()) it.notaryChangeTx else it.tx }
vaultService.notifyAll(toNotify)
// When the user has requested StatesToRecord.ALL we may end up recording and relationally mapping states
// that do not involve us and that we cannot sign for. This will break coin selection and thus a warning
// is present in the documentation for this feature (see the "Observer nodes" tutorial on docs.corda.net).
//
// The reason for this is three-fold:
//
// 1) We are putting in place the observer mode feature relatively quickly to meet specific customer
// launch target dates.
//
// 2) The right design for vaults which mix observations and relevant states isn't entirely clear yet.
//
// 3) If we get the design wrong it could create security problems and business confusions.
//
// Back in the bitcoinj days I did add support for "watching addresses" to the wallet code, which is the
// Bitcoin equivalent of observer nodes:
//
// https://bitcoinj.github.io/working-with-the-wallet#watching-wallets
//
// The ability to have a wallet containing both irrelevant and relevant states complicated everything quite
// dramatically, even methods as basic as the getBalance() API which required additional modes to let you
// query "balance I can spend" vs "balance I am observing". In the end it might have been better to just
// require the user to create an entirely separate wallet for observing with.
//
// In Corda we don't support a single node having multiple vaults (at the time of writing), and it's not
// clear that's the right way to go: perhaps adding an "origin" column to the VAULT_STATES table is a better
// solution. Then you could select subsets of states depending on where the report came from.
//
// The risk of doing this is that apps/developers may use 'canned SQL queries' not written by us that forget
// to add a WHERE clause for the origin column. Those queries will seem to work most of the time until
// they're run on an observer node and mix in irrelevant data. In the worst case this may result in
// erroneous data being reported to the user, which could cause security problems.
//
// Because the primary use case for recording irrelevant states is observer/regulator nodes, who are unlikely
// to make writes to the ledger very often or at all, we choose to punt this issue for the time being.
vaultService.notifyAll(statesToRecord, toNotify)
}
}

View File

@ -1,5 +1,6 @@
package net.corda.node.services.api
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.VaultService
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.NotaryChangeWireTransaction
@ -12,8 +13,8 @@ interface VaultServiceInternal : VaultService {
* indicate whether an update consists entirely of regular or notary change transactions, which may require
* different processing logic.
*/
fun notifyAll(txns: Iterable<CoreTransaction>)
fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>)
/** Same as notifyAll but with a single transaction. */
fun notify(tx: CoreTransaction) = notifyAll(listOf(tx))
fun notify(statesToRecord: StatesToRecord, tx: CoreTransaction) = notifyAll(statesToRecord, listOf(tx))
}

View File

@ -8,10 +8,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.utilities.UntrustworthyData
class FlowSessionImpl(
override val counterparty: Party
) : FlowSession() {
class FlowSessionImpl(override val counterparty: Party) : FlowSession() {
internal lateinit var stateMachine: FlowStateMachine<*>
internal lateinit var sessionFlow: FlowLogic<*>
@ -57,5 +54,7 @@ class FlowSessionImpl(
@Suspendable
override fun send(payload: Any) = send(payload, maySkipCheckpoint = false)
override fun toString() = "Flow session with $counterparty"
}

View File

@ -13,6 +13,7 @@ import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.node.services.vault.SortAttribute
import net.corda.core.messaging.DataFeed
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.VaultQueryException
import net.corda.core.node.services.vault.*
import net.corda.core.schemas.PersistentStateRef
@ -50,8 +51,7 @@ private fun CriteriaBuilder.executeUpdate(session: Session, configure: Root<*>.(
}
/**
* Currently, the node vault service is a very simple RDBMS backed implementation. It will change significantly when
* we add further functionality as the design for the vault and vault service matures.
* The vault service handles storage, retrieval and querying of states.
*
* This class needs database transactions to be in-flight during method calls and init, and will throw exceptions if
* this is not the case.
@ -59,8 +59,12 @@ private fun CriteriaBuilder.executeUpdate(session: Session, configure: Root<*>.(
* TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time.
* TODO: have transaction storage do some caching.
*/
class NodeVaultService(private val clock: Clock, private val keyManagementService: KeyManagementService, private val stateLoader: StateLoader, hibernateConfig: HibernateConfiguration) : SingletonSerializeAsToken(), VaultServiceInternal {
class NodeVaultService(
private val clock: Clock,
private val keyManagementService: KeyManagementService,
private val stateLoader: StateLoader,
hibernateConfig: HibernateConfiguration
) : SingletonSerializeAsToken(), VaultServiceInternal {
private companion object {
val log = loggerFor<NodeVaultService>()
}
@ -118,7 +122,10 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
override val updates: Observable<Vault.Update<ContractState>>
get() = mutex.locked { _updatesInDbTx }
override fun notifyAll(txns: Iterable<CoreTransaction>) {
override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>) {
if (statesToRecord == StatesToRecord.NONE)
return
// It'd be easier to just group by type, but then we'd lose ordering.
val regularTxns = mutableListOf<WireTransaction>()
val notaryChangeTxns = mutableListOf<NotaryChangeWireTransaction>()
@ -128,30 +135,33 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
is WireTransaction -> {
regularTxns.add(tx)
if (notaryChangeTxns.isNotEmpty()) {
notifyNotaryChange(notaryChangeTxns.toList())
notifyNotaryChange(notaryChangeTxns.toList(), statesToRecord)
notaryChangeTxns.clear()
}
}
is NotaryChangeWireTransaction -> {
notaryChangeTxns.add(tx)
if (regularTxns.isNotEmpty()) {
notifyRegular(regularTxns.toList())
notifyRegular(regularTxns.toList(), statesToRecord)
regularTxns.clear()
}
}
}
}
if (regularTxns.isNotEmpty()) notifyRegular(regularTxns.toList())
if (notaryChangeTxns.isNotEmpty()) notifyNotaryChange(notaryChangeTxns.toList())
if (regularTxns.isNotEmpty()) notifyRegular(regularTxns.toList(), statesToRecord)
if (notaryChangeTxns.isNotEmpty()) notifyNotaryChange(notaryChangeTxns.toList(), statesToRecord)
}
private fun notifyRegular(txns: Iterable<WireTransaction>) {
private fun notifyRegular(txns: Iterable<WireTransaction>, statesToRecord: StatesToRecord) {
fun makeUpdate(tx: WireTransaction): Vault.Update<ContractState> {
val myKeys = keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } })
val ourNewStates = tx.outputs.
filter { isRelevant(it.data, myKeys.toSet()) }.
map { tx.outRef<ContractState>(it.data) }
val ourNewStates = when (statesToRecord) {
StatesToRecord.NONE -> throw AssertionError("Should not reach here")
StatesToRecord.ONLY_RELEVANT -> tx.outputs.filter { isRelevant(it.data, myKeys.toSet()) }
StatesToRecord.ALL_VISIBLE -> tx.outputs
}.map { tx.outRef<ContractState>(it.data) }
// Retrieve all unconsumed states for this transaction's inputs
val consumedStates = loadStates(tx.inputs)
@ -169,7 +179,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
processAndNotify(netDelta)
}
private fun notifyNotaryChange(txns: Iterable<NotaryChangeWireTransaction>) {
private fun notifyNotaryChange(txns: Iterable<NotaryChangeWireTransaction>, statesToRecord: StatesToRecord) {
fun makeUpdate(tx: NotaryChangeWireTransaction): Vault.Update<ContractState> {
// We need to resolve the full transaction here because outputs are calculated from inputs
// We also can't do filtering beforehand, since output encumbrance pointers get recalculated based on
@ -178,7 +188,12 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
val myKeys = keyManagementService.filterMyKeys(ltx.outputs.flatMap { it.data.participants.map { it.owningKey } })
val (consumedStateAndRefs, producedStates) = ltx.inputs.
zip(ltx.outputs).
filter { (_, output) -> isRelevant(output.data, myKeys.toSet()) }.
filter { (_, output) ->
if (statesToRecord == StatesToRecord.ONLY_RELEVANT)
isRelevant(output.data, myKeys.toSet())
else
true
}.
unzip()
val producedStateAndRefs = producedStates.map { ltx.outRef<ContractState>(it.data) }

View File

@ -12,6 +12,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
@ -107,11 +108,12 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
doReturn(myInfo).whenever(it).myInfo
doReturn(kms).whenever(it).keyManagementService
doReturn(CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.testing.contracts")), MockAttachmentStorage())).whenever(it).cordappProvider
doCallRealMethod().whenever(it).recordTransactions(any<SignedTransaction>())
doCallRealMethod().whenever(it).recordTransactions(any<Boolean>(), any<SignedTransaction>())
doCallRealMethod().whenever(it).recordTransactions(any(), any<Iterable<SignedTransaction>>())
doCallRealMethod().whenever(it).recordTransactions(any<StatesToRecord>(), any<Iterable<SignedTransaction>>())
doCallRealMethod().whenever(it).recordTransactions(any<Iterable<SignedTransaction>>())
doCallRealMethod().whenever(it).recordTransactions(any<SignedTransaction>(), anyVararg<SignedTransaction>())
doReturn(NodeVaultService(testClock, kms, stateLoader, database.hibernateConfig)).whenever(it).vaultService
doReturn(this@NodeSchedulerServiceTest).whenever(it).testReference
}
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
mockSMM = StateMachineManagerImpl(services, DBCheckpointStorage(), smmExecutor, database)

View File

@ -5,13 +5,13 @@ import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.VaultService
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.CordaPersistence
@ -40,7 +40,6 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(), ::makeTestIdentityService)
database.transaction {
services = object : MockServices(BOB_KEY) {
override val vaultService: VaultServiceInternal
get() {
@ -54,7 +53,7 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
vaultService.notifyAll(StatesToRecord.ONLY_RELEVANT, txs.map { it.tx })
}
}
}

View File

@ -5,6 +5,7 @@ import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.SecureHash
import net.corda.core.node.StatesToRecord
import net.corda.core.utilities.toBase58String
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
@ -82,12 +83,12 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
hibernateConfig = database.hibernateConfig
services = object : MockServices(cordappPackages, BOB_KEY, BOC_KEY, DUMMY_NOTARY_KEY) {
override val vaultService = makeVaultService(database.hibernateConfig)
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
vaultService.notifyAll(statesToRecord, txs.map { it.tx })
}
override fun jdbcSession() = database.createSession()

View File

@ -145,17 +145,7 @@ class FlowFrameworkTests {
val restoredFlow = bobNode.restartAndGetRestoredFlow<InitiatedReceiveFlow>()
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
}
@Test
fun `flow added before network map does run after init`() {
val charlieNode = mockNet.createNode() //create vanilla node
val flow = NoOpFlow()
charlieNode.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
mockNet.runNetwork() // Allow network map messages to flow
assertEquals(true, flow.flowStarted) // Now we should have run the flow
}
@Test
fun `flow loaded from checkpoint will respond to messages from before start`() {
aliceNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }

View File

@ -1,16 +1,15 @@
package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Amount
import net.corda.core.contracts.Issued
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.*
import net.corda.core.crypto.generateKeyPair
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.*
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.QueryCriteria.*
import net.corda.core.transactions.NotaryChangeWireTransaction
@ -30,7 +29,6 @@ import net.corda.node.utilities.CordaPersistence
import net.corda.testing.*
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
@ -58,8 +56,10 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
@Before
fun setUp() {
LogHelper.setLevel(NodeVaultService::class)
val databaseAndServices = makeTestDatabaseAndMockServices(keys = listOf(BOC_KEY, DUMMY_CASH_ISSUER_KEY),
cordappPackages = cordappPackages)
val databaseAndServices = MockServices.makeTestDatabaseAndMockServices(
keys = listOf(BOC_KEY, DUMMY_CASH_ISSUER_KEY),
cordappPackages = cordappPackages
)
database = databaseAndServices.first
services = databaseAndServices.second
issuerServices = MockServices(cordappPackages, DUMMY_CASH_ISSUER_KEY, BOC_KEY)
@ -102,10 +102,10 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val originalVault = vaultService
val services2 = object : MockServices() {
override val vaultService: NodeVaultService get() = originalVault
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
vaultService.notify(stx.tx)
vaultService.notify(statesToRecord, stx.tx)
}
}
}
@ -512,14 +512,14 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
}.toWireTransaction(services)
val cashState = StateAndRef(issueTx.outputs.single(), StateRef(issueTx.id, 0))
database.transaction { service.notify(issueTx) }
database.transaction { service.notify(StatesToRecord.ONLY_RELEVANT, issueTx) }
val expectedIssueUpdate = Vault.Update(emptySet(), setOf(cashState), null)
database.transaction {
val moveTx = TransactionBuilder(services.myInfo.chooseIdentity()).apply {
Cash.generateSpend(services, this, Amount(1000, GBP), thirdPartyIdentity)
}.toWireTransaction(services)
service.notify(moveTx)
service.notify(StatesToRecord.ONLY_RELEVANT, moveTx)
}
val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null)
@ -556,7 +556,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val cashStateWithNewNotary = StateAndRef(initialCashState.state.copy(notary = newNotary), StateRef(changeNotaryTx.id, 0))
database.transaction {
service.notifyAll(listOf(issueStx.tx, changeNotaryTx))
service.notifyAll(StatesToRecord.ONLY_RELEVANT, listOf(issueStx.tx, changeNotaryTx))
}
// Move cash
@ -567,7 +567,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
}
database.transaction {
service.notify(moveTx)
service.notify(StatesToRecord.ONLY_RELEVANT, moveTx)
}
val expectedIssueUpdate = Vault.Update(emptySet(), setOf(initialCashState), null)
@ -577,4 +577,32 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val observedUpdates = vaultSubscriber.onNextEvents
assertEquals(observedUpdates, listOf(expectedIssueUpdate, expectedNotaryChangeUpdate, expectedMoveUpdate))
}
@Test
fun observerMode() {
fun countCash(): Long {
return database.transaction {
vaultService.queryBy(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(), PageSpecification(1)).totalStatesAvailable
}
}
val currentCashStates = countCash()
// Send some minimalist dummy transaction.
val txb = TransactionBuilder(DUMMY_NOTARY)
txb.addOutputState(Cash.State(MEGA_CORP.ref(0), 100.DOLLARS, MINI_CORP), Cash::class.java.name)
txb.addCommand(Cash.Commands.Move(), MEGA_CORP_PUBKEY)
val wtx = txb.toWireTransaction(services)
database.transaction {
vaultService.notify(StatesToRecord.ONLY_RELEVANT, wtx)
}
// Check that it was ignored as irrelevant.
assertEquals(currentCashStates, countCash())
// Now try again and check it was accepted.
database.transaction {
vaultService.notify(StatesToRecord.ALL_VISIBLE, wtx)
}
assertEquals(currentCashStates + 1, countCash())
}
}