mirror of
https://github.com/corda/corda.git
synced 2025-03-14 00:06:45 +00:00
resolved conflicts
This commit is contained in:
commit
b25892e45b
@ -2123,7 +2123,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.node.services.TrustedAuthorityNotaryService extends net.corda.core.node.services.NotaryService
|
||||
public <init>()
|
||||
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature)
|
||||
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature, net.corda.core.contracts.TimeWindow)
|
||||
@org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog()
|
||||
@org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker()
|
||||
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider()
|
||||
@ -2139,7 +2139,7 @@ public static final class net.corda.core.node.services.TrustedAuthorityNotarySer
|
||||
@org.jetbrains.annotations.NotNull public final net.corda.core.node.services.UniquenessProvider$Conflict getError()
|
||||
##
|
||||
public interface net.corda.core.node.services.UniquenessProvider
|
||||
public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature)
|
||||
public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature, net.corda.core.contracts.TimeWindow)
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.node.services.UniquenessProvider$Conflict extends java.lang.Object
|
||||
public <init>(Map)
|
||||
|
@ -48,7 +48,7 @@ buildscript {
|
||||
* The issue has been reported to upstream:
|
||||
* https://issues.apache.org/jira/browse/ARTEMIS-1559
|
||||
*/
|
||||
ext.artemis_version = '2.2.0'
|
||||
ext.artemis_version = '2.5.0'
|
||||
ext.jackson_version = '2.9.3'
|
||||
ext.jetty_version = '9.4.7.v20170914'
|
||||
ext.jersey_version = '2.25'
|
||||
@ -61,7 +61,7 @@ buildscript {
|
||||
ext.caffeine_version = constants.getProperty("caffeineVersion")
|
||||
ext.metrics_version = constants.getProperty("metricsVersion")
|
||||
ext.okhttp_version = '3.5.0'
|
||||
ext.netty_version = '4.1.9.Final'
|
||||
ext.netty_version = '4.1.22.Final'
|
||||
ext.typesafe_config_version = constants.getProperty("typesafeConfigVersion")
|
||||
ext.fileupload_version = '1.3.3'
|
||||
ext.junit_version = '4.12'
|
||||
|
@ -72,7 +72,7 @@ const val MAX_ISSUER_REF_SIZE = 512
|
||||
* cares about specific issuers with code that will accept any, or which is imposing issuer constraints via some
|
||||
* other mechanism and the additional type safety is not wanted.
|
||||
*/
|
||||
fun <T : Any> Amount<Issued<T>>.withoutIssuer(): Amount<T> = Amount(quantity, token.product)
|
||||
fun <T : Any> Amount<Issued<T>>.withoutIssuer(): Amount<T> = Amount(quantity, displayTokenSize, token.product)
|
||||
|
||||
// DOCSTART 3
|
||||
|
||||
|
@ -156,8 +156,8 @@ class NotaryFlow {
|
||||
try {
|
||||
val parts = validateRequest(requestPayload)
|
||||
txId = parts.id
|
||||
service.validateTimeWindow(parts.timestamp)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature)
|
||||
checkNotary(parts.notary)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp)
|
||||
signTransactionAndSendResponse(txId)
|
||||
} catch (e: NotaryInternalException) {
|
||||
throw NotaryException(e.error, txId)
|
||||
|
@ -1,9 +1,14 @@
|
||||
package net.corda.core.internal
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.isFulfilledBy
|
||||
import net.corda.core.flows.NotarisationResponse
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* Checks that there are sufficient signatures to satisfy the notary signing requirement and validates the signatures
|
||||
@ -13,4 +18,19 @@ fun NotarisationResponse.validateSignatures(txId: SecureHash, notary: Party) {
|
||||
val signingKeys = signatures.map { it.by }
|
||||
require(notary.owningKey.isFulfilledBy(signingKeys)) { "Insufficient signatures to fulfill the notary signing requirement for $notary" }
|
||||
signatures.forEach { it.verify(txId) }
|
||||
}
|
||||
|
||||
/** Checks if the provided states were used as inputs in the specified transaction. */
|
||||
fun isConsumedByTheSameTx(txIdHash: SecureHash, consumedStates: Map<StateRef, StateConsumptionDetails>): Boolean {
|
||||
val conflicts = consumedStates.filter { (_, cause) ->
|
||||
cause.hashOfTransactionId != txIdHash
|
||||
}
|
||||
return conflicts.isEmpty()
|
||||
}
|
||||
|
||||
/** Returns [NotaryError.TimeWindowInvalid] if [currentTime] is outside the [timeWindow], and *null* otherwise. */
|
||||
fun validateTimeWindow(currentTime: Instant, timeWindow: TimeWindow?): NotaryError.TimeWindowInvalid? {
|
||||
return if (timeWindow != null && currentTime !in timeWindow) {
|
||||
NotaryError.TimeWindowInvalid(currentTime, timeWindow)
|
||||
} else null
|
||||
}
|
@ -27,6 +27,7 @@ abstract class NotaryService : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
@Deprecated("No longer used")
|
||||
const val ID_PREFIX = "corda.notary."
|
||||
|
||||
@Deprecated("No longer used")
|
||||
fun constructId(validating: Boolean, raft: Boolean = false, bft: Boolean = false, custom: Boolean = false): String {
|
||||
require(Booleans.countTrue(raft, bft, custom) <= 1) { "At most one of raft, bft or custom may be true" }
|
||||
@ -89,9 +90,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
|
||||
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
|
||||
* this method does not throw an exception when input states are present multiple times within the transaction.
|
||||
*/
|
||||
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature) {
|
||||
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) {
|
||||
try {
|
||||
uniquenessProvider.commit(inputs, txId, caller, requestSignature)
|
||||
uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow)
|
||||
} catch (e: NotaryInternalException) {
|
||||
if (e.error is NotaryError.Conflict) {
|
||||
val conflicts = inputs.filterIndexed { _, stateRef ->
|
||||
|
@ -12,6 +12,7 @@ package net.corda.core.node.services
|
||||
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
@ -25,7 +26,13 @@ import net.corda.core.serialization.CordaSerializable
|
||||
*/
|
||||
interface UniquenessProvider {
|
||||
/** Commits all input states of the given transaction. */
|
||||
fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature)
|
||||
fun commit(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow? = null
|
||||
)
|
||||
|
||||
/** Specifies the consuming transaction for every conflicting state. */
|
||||
@CordaSerializable
|
||||
|
@ -11,7 +11,6 @@
|
||||
package net.corda.behave.scenarios
|
||||
|
||||
import cucumber.api.java.After
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.behave.network.Network
|
||||
import net.corda.behave.node.Node
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
@ -20,8 +19,6 @@ import java.time.Duration
|
||||
|
||||
class ScenarioState {
|
||||
|
||||
private val log = getLogger<ScenarioState>()
|
||||
|
||||
private val nodes = mutableListOf<Node.Builder>()
|
||||
|
||||
private var network: Network? = null
|
||||
|
@ -10,13 +10,13 @@
|
||||
|
||||
package net.corda.behave.scenarios.helpers
|
||||
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.behave.scenarios.ScenarioState
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.utilities.contextLogger
|
||||
|
||||
abstract class Substeps(protected val state: ScenarioState) {
|
||||
|
||||
protected val log = getLogger<Substeps>()
|
||||
protected val log = contextLogger()
|
||||
|
||||
protected fun withNetwork(action: ScenarioState.() -> Unit) =
|
||||
state.withNetwork(action)
|
||||
|
@ -51,8 +51,9 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v
|
||||
cashSelectionAlgo?.let {
|
||||
instance.set(cashSelectionAlgo)
|
||||
cashSelectionAlgo
|
||||
} ?: throw ClassNotFoundException("\nUnable to load compatible cash selection algorithm implementation for JDBC driver ($_metadata)." +
|
||||
"\nPlease specify an implementation in META-INF/services/${AbstractCashSelection::class.java}")
|
||||
} ?: throw ClassNotFoundException("\nUnable to load compatible cash selection algorithm implementation for JDBC driver name '${_metadata.driverName}'." +
|
||||
"\nPlease specify an implementation in META-INF/services/${AbstractCashSelection::class.qualifiedName}." +
|
||||
"\nAvailable implementations: $cashSelectionAlgos")
|
||||
}.invoke()
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ class CashSelectionH2Impl : AbstractCashSelection() {
|
||||
return metadata.driverName == JDBC_DRIVER_NAME
|
||||
}
|
||||
|
||||
override fun toString() = "${this::class.java} for $JDBC_DRIVER_NAME"
|
||||
override fun toString() = "${this::class.qualifiedName} for '$JDBC_DRIVER_NAME'"
|
||||
|
||||
// We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins:
|
||||
// 1) There is no standard SQL mechanism of calculating a cumulative total on a field and restricting row selection on the
|
||||
|
@ -1,37 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.finance.contracts.asset.cash.selection
|
||||
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import java.sql.Connection
|
||||
import java.sql.DatabaseMetaData
|
||||
import java.sql.ResultSet
|
||||
import java.util.*
|
||||
|
||||
class CashSelectionMySQLImpl : AbstractCashSelection() {
|
||||
|
||||
companion object {
|
||||
const val JDBC_DRIVER_NAME = "MySQL JDBC Driver"
|
||||
}
|
||||
|
||||
override fun isCompatible(metadata: DatabaseMetaData): Boolean {
|
||||
return metadata.driverName == JDBC_DRIVER_NAME
|
||||
}
|
||||
|
||||
override fun executeQuery(statement: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, issuerKeysStr: Set<AbstractParty>, issuerRefsStr: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean): Boolean {
|
||||
TODO("MySQL cash selection not implemented")
|
||||
}
|
||||
|
||||
override fun toString() = "${this::class.java} for ${CashSelectionH2Impl.JDBC_DRIVER_NAME}"
|
||||
}
|
@ -23,7 +23,7 @@ import java.util.*
|
||||
class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
|
||||
|
||||
companion object {
|
||||
val JDBC_DRIVER_NAME = "PostgreSQL JDBC Driver"
|
||||
const val JDBC_DRIVER_NAME = "PostgreSQL JDBC Driver"
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
|
||||
return metadata.driverName == JDBC_DRIVER_NAME
|
||||
}
|
||||
|
||||
override fun toString() = "${this::class.java} for $JDBC_DRIVER_NAME"
|
||||
override fun toString() = "${this::class.qualifiedName} for '$JDBC_DRIVER_NAME'"
|
||||
|
||||
// This is using PostgreSQL window functions for selecting a minimum set of rows that match a request amount of coins:
|
||||
// 1) This may also be possible with user-defined functions (e.g. using PL/pgSQL)
|
||||
|
@ -9,7 +9,6 @@
|
||||
#
|
||||
|
||||
net.corda.finance.contracts.asset.cash.selection.CashSelectionH2Impl
|
||||
net.corda.finance.contracts.asset.cash.selection.CashSelectionMySQLImpl
|
||||
net.corda.finance.contracts.asset.cash.selection.CashSelectionPostgreSQLImpl
|
||||
net.corda.finance.contracts.asset.cash.selection.CashSelectionSQLServerImpl
|
||||
net.corda.finance.contracts.asset.cash.selection.CashSelectionOracleImpl
|
||||
|
@ -30,6 +30,9 @@ dependencies {
|
||||
compile "org.apache.activemq:artemis-core-client:${artemis_version}"
|
||||
compile "org.apache.activemq:artemis-commons:${artemis_version}"
|
||||
|
||||
// Netty: All of it.
|
||||
compile "io.netty:netty-all:$netty_version"
|
||||
|
||||
// For adding serialisation of file upload streams to RPC
|
||||
// TODO: Remove this dependency and the code that requires it
|
||||
compile "commons-fileupload:commons-fileupload:$fileupload_version"
|
||||
|
@ -157,12 +157,9 @@ dependencies {
|
||||
compileOnly "co.paralleluniverse:capsule:$capsule_version"
|
||||
|
||||
// Java Atomix: RAFT library
|
||||
compile 'io.atomix.copycat:copycat-client:1.2.3'
|
||||
compile 'io.atomix.copycat:copycat-server:1.2.3'
|
||||
compile 'io.atomix.catalyst:catalyst-netty:1.1.2'
|
||||
|
||||
// Netty: All of it.
|
||||
compile "io.netty:netty-all:$netty_version"
|
||||
compile 'io.atomix.copycat:copycat-client:1.2.8'
|
||||
compile 'io.atomix.copycat:copycat-server:1.2.8'
|
||||
compile 'io.atomix.catalyst:catalyst-netty:1.2.1'
|
||||
|
||||
// OkHTTP: Simple HTTP library.
|
||||
compile "com.squareup.okhttp3:okhttp:$okhttp_version"
|
||||
|
@ -15,8 +15,8 @@ import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
@ -41,97 +41,85 @@ import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.node.TestClock
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNetwork.MockNode
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import org.hamcrest.Matchers.instanceOf
|
||||
import org.junit.*
|
||||
import org.junit.Assert.assertThat
|
||||
import java.nio.file.Paths
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutionException
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class BFTNotaryServiceTests : IntegrationTest() {
|
||||
class BFTNotaryServiceTests {
|
||||
companion object {
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var notary: Party
|
||||
private lateinit var node: StartedNode<MockNode>
|
||||
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas("node_0", "node_1", "node_2", "node_3", "node_4", "node_5",
|
||||
"node_6", "node_7", "node_8", "node_9")
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var notary: Party
|
||||
private lateinit var node: StartedNode<MockNode>
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts"))
|
||||
}
|
||||
|
||||
@After
|
||||
fun stopNodes() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
private fun startBftClusterAndNode(clusterSize: Int, exposeRaces: Boolean = false) {
|
||||
(Paths.get("config") / "currentView").deleteIfExists() // XXX: Make config object warn if this exists?
|
||||
val replicaIds = (0 until clusterSize)
|
||||
|
||||
notary = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
|
||||
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
|
||||
CordaX500Name("BFT", "Zurich", "CH"))
|
||||
|
||||
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notary, false))))
|
||||
|
||||
val clusterAddresses = replicaIds.map { NetworkHostAndPort("localhost", 11000 + it * 10) }
|
||||
|
||||
val nodes = replicaIds.map { replicaId ->
|
||||
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
|
||||
val notary = NotaryConfig(validating = false, bftSMaRt = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces))
|
||||
doReturn(notary).whenever(it).notary
|
||||
}))
|
||||
} + mockNet.createUnstartedNode()
|
||||
|
||||
// MockNetwork doesn't support BFT clusters, so we create all the nodes we need unstarted, and then install the
|
||||
// network-parameters in their directories before they're started.
|
||||
node = nodes.map { node ->
|
||||
networkParameters.install(mockNet.baseDirectory(node.id))
|
||||
node.start()
|
||||
}.last()
|
||||
}
|
||||
|
||||
/** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */
|
||||
@Test
|
||||
fun `all replicas start even if there is a new consensus during startup`() {
|
||||
startBftClusterAndNode(minClusterSize(1), exposeRaces = true) // This true adds a sleep to expose the race.
|
||||
val f = node.run {
|
||||
val trivialTx = signInitialTransaction(notary) {
|
||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||
}
|
||||
// Create a new consensus while the redundant replica is sleeping:
|
||||
services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture
|
||||
@BeforeClass
|
||||
@JvmStatic
|
||||
fun before() {
|
||||
mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts"))
|
||||
val clusterSize = minClusterSize(1)
|
||||
val started = startBftClusterAndNode(clusterSize, mockNet)
|
||||
notary = started.first
|
||||
node = started.second
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@JvmStatic
|
||||
fun stopNodes() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
fun startBftClusterAndNode(clusterSize: Int, mockNet: InternalMockNetwork, exposeRaces: Boolean = false): Pair<Party, StartedNode<MockNode>> {
|
||||
(Paths.get("config") / "currentView").deleteIfExists() // XXX: Make config object warn if this exists?
|
||||
val replicaIds = (0 until clusterSize)
|
||||
|
||||
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
|
||||
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
|
||||
CordaX500Name("BFT", "Zurich", "CH"))
|
||||
|
||||
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false))))
|
||||
|
||||
val clusterAddresses = replicaIds.map { NetworkHostAndPort("localhost", 11000 + it * 10) }
|
||||
|
||||
val nodes = replicaIds.map { replicaId ->
|
||||
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
|
||||
val notary = NotaryConfig(validating = false, bftSMaRt = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces))
|
||||
doReturn(notary).whenever(it).notary
|
||||
}))
|
||||
} + mockNet.createUnstartedNode()
|
||||
|
||||
// MockNetwork doesn't support BFT clusters, so we create all the nodes we need unstarted, and then install the
|
||||
// network-parameters in their directories before they're started.
|
||||
val node = nodes.map { node ->
|
||||
networkParameters.install(mockNet.baseDirectory(node.id))
|
||||
node.start()
|
||||
}.last()
|
||||
|
||||
return Pair(notaryIdentity, node)
|
||||
}
|
||||
mockNet.runNetwork()
|
||||
f.getOrThrow()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `detect double spend 1 faulty`() {
|
||||
detectDoubleSpend(1)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `detect double spend 2 faulty`() {
|
||||
detectDoubleSpend(2)
|
||||
}
|
||||
|
||||
private fun detectDoubleSpend(faultyReplicas: Int) {
|
||||
val clusterSize = minClusterSize(faultyReplicas)
|
||||
startBftClusterAndNode(clusterSize)
|
||||
fun `detect double spend`() {
|
||||
node.run {
|
||||
val issueTx = signInitialTransaction(notary) {
|
||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||
@ -152,7 +140,7 @@ class BFTNotaryServiceTests : IntegrationTest() {
|
||||
val successfulIndex = results.mapIndexedNotNull { index, result ->
|
||||
if (result is Try.Success) {
|
||||
val signers = result.value.map { it.by }
|
||||
assertEquals(minCorrectReplicas(clusterSize), signers.size)
|
||||
assertEquals(minCorrectReplicas(3), signers.size)
|
||||
signers.forEach {
|
||||
assertTrue(it in (notary.owningKey as CompositeKey).leafKeys)
|
||||
}
|
||||
@ -174,6 +162,63 @@ class BFTNotaryServiceTests : IntegrationTest() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transactions outside their time window are rejected`() {
|
||||
node.run {
|
||||
val issueTx = signInitialTransaction(notary) {
|
||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||
}
|
||||
database.transaction {
|
||||
services.recordTransactions(issueTx)
|
||||
}
|
||||
val spendTx = signInitialTransaction(notary) {
|
||||
addInputState(issueTx.tx.outRef<ContractState>(0))
|
||||
setTimeWindow(TimeWindow.fromOnly(Instant.MAX))
|
||||
}
|
||||
val flow = NotaryFlow.Client(spendTx)
|
||||
val resultFuture = services.startFlow(flow).resultFuture
|
||||
mockNet.runNetwork()
|
||||
val exception = assertFailsWith<ExecutionException> { resultFuture.get() }
|
||||
assertThat(exception.cause, instanceOf(NotaryException::class.java))
|
||||
val error = (exception.cause as NotaryException).error
|
||||
assertThat(error, instanceOf(NotaryError.TimeWindowInvalid::class.java))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transactions can be re-notarised outside their time window`() {
|
||||
node.run {
|
||||
val issueTx = signInitialTransaction(notary) {
|
||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||
}
|
||||
database.transaction {
|
||||
services.recordTransactions(issueTx)
|
||||
}
|
||||
val spendTx = signInitialTransaction(notary) {
|
||||
addInputState(issueTx.tx.outRef<ContractState>(0))
|
||||
setTimeWindow(TimeWindow.untilOnly(Instant.now() + Duration.ofHours(1)))
|
||||
}
|
||||
val resultFuture = services.startFlow(NotaryFlow.Client(spendTx)).resultFuture
|
||||
mockNet.runNetwork()
|
||||
val signatures = resultFuture.get()
|
||||
verifySignatures(signatures, spendTx.id)
|
||||
|
||||
for (node in mockNet.nodes) {
|
||||
(node.started!!.services.clock as TestClock).advanceBy(Duration.ofDays(1))
|
||||
}
|
||||
|
||||
val resultFuture2 = services.startFlow(NotaryFlow.Client(spendTx)).resultFuture
|
||||
mockNet.runNetwork()
|
||||
val signatures2 = resultFuture2.get()
|
||||
verifySignatures(signatures2, spendTx.id)
|
||||
}
|
||||
}
|
||||
|
||||
private fun verifySignatures(signatures: List<TransactionSignature>, txId: SecureHash) {
|
||||
notary.owningKey.isFulfilledBy(signatures.map { it.by })
|
||||
signatures.forEach { it.verify(txId) }
|
||||
}
|
||||
|
||||
private fun StartedNode<MockNode>.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction {
|
||||
return services.signInitialTransaction(
|
||||
TransactionBuilder(notary).apply {
|
||||
|
@ -0,0 +1,59 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.BFTNotaryServiceTests.Companion.startBftClusterAndNode
|
||||
import net.corda.node.services.transactions.minClusterSize
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNetwork.MockNode
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
||||
class BFTSMaRtTests {
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts"))
|
||||
}
|
||||
|
||||
@After
|
||||
fun stopNodes() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
/** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */
|
||||
@Test
|
||||
fun `all replicas start even if there is a new consensus during startup`() {
|
||||
val clusterSize = minClusterSize(1)
|
||||
val (notary, node) = startBftClusterAndNode(clusterSize, mockNet, exposeRaces = true) // This true adds a sleep to expose the race.
|
||||
val f = node.run {
|
||||
val trivialTx = signInitialTransaction(notary) {
|
||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||
}
|
||||
// Create a new consensus while the redundant replica is sleeping:
|
||||
services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture
|
||||
}
|
||||
mockNet.runNetwork()
|
||||
f.getOrThrow()
|
||||
}
|
||||
|
||||
private fun StartedNode<MockNode>.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction {
|
||||
return services.signInitialTransaction(
|
||||
TransactionBuilder(notary).apply {
|
||||
addCommand(dummyCommand(services.myInfo.singleIdentity().owningKey))
|
||||
block()
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
@ -8,7 +8,7 @@
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.services
|
||||
package net.corda.node.services.distributed
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.contracts.Amount
|
@ -149,9 +149,9 @@ class BFTNonValidatingNotaryService(
|
||||
val id = transaction.id
|
||||
val inputs = transaction.inputs
|
||||
val notary = transaction.notary
|
||||
if (transaction is FilteredTransaction) NotaryService.validateTimeWindow(services.clock, transaction.timeWindow)
|
||||
val timeWindow = (transaction as? FilteredTransaction)?.timeWindow
|
||||
if (notary !in services.myInfo.legalIdentities) throw NotaryInternalException(NotaryError.WrongNotary)
|
||||
commitInputStates(inputs, id, callerIdentity.name, requestSignature)
|
||||
commitInputStates(inputs, id, callerIdentity.name, requestSignature, timeWindow)
|
||||
log.debug { "Inputs committed successfully, signing $id" }
|
||||
BFTSMaRt.ReplicaResponse.Signature(sign(id))
|
||||
} catch (e: NotaryInternalException) {
|
||||
|
@ -23,12 +23,15 @@ import bftsmart.tom.server.defaultservices.DefaultRecoverable
|
||||
import bftsmart.tom.server.defaultservices.DefaultReplier
|
||||
import bftsmart.tom.util.Extractor
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.core.internal.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.toTypedArray
|
||||
import net.corda.core.internal.validateTimeWindow
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -226,25 +229,27 @@ object BFTSMaRt {
|
||||
*/
|
||||
abstract fun executeCommand(command: ByteArray): ByteArray?
|
||||
|
||||
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) {
|
||||
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) {
|
||||
log.debug { "Attempting to commit inputs for transaction: $txId" }
|
||||
|
||||
val conflicts = mutableMapOf<StateRef, SecureHash>()
|
||||
services.database.transaction {
|
||||
logRequest(txId, callerName, requestSignature)
|
||||
states.forEach { state ->
|
||||
commitLog[state]?.let { conflicts[state] = it }
|
||||
val conflictingStates = LinkedHashMap<StateRef, StateConsumptionDetails>()
|
||||
for (state in states) {
|
||||
commitLog[state]?.let { conflictingStates[state] = StateConsumptionDetails(it.sha256()) }
|
||||
}
|
||||
if (conflicts.isEmpty()) {
|
||||
log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" }
|
||||
states.forEach { stateRef ->
|
||||
commitLog[stateRef] = txId
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
if (!isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
|
||||
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
|
||||
throw NotaryInternalException(NotaryError.Conflict(txId, conflictingStates))
|
||||
}
|
||||
} else {
|
||||
log.debug { "Conflict detected – the following inputs have already been committed: ${conflicts.keys.joinToString()}" }
|
||||
val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) }
|
||||
val error = NotaryError.Conflict(txId, conflict)
|
||||
throw NotaryInternalException(error)
|
||||
val outsideTimeWindowError = validateTimeWindow(services.clock.instant(), timeWindow)
|
||||
if (outsideTimeWindowError == null) {
|
||||
states.forEach { commitLog[it] = txId }
|
||||
log.debug { "Successfully committed all input states: $states" }
|
||||
} else {
|
||||
throw NotaryInternalException(outsideTimeWindowError)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,7 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
@ -19,12 +20,15 @@ import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.validateTimeWindow
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
@ -74,7 +78,7 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
|
||||
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash)
|
||||
|
||||
private class InnerState {
|
||||
val committedStates = createMap()
|
||||
val commitLog = createMap()
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
@ -105,10 +109,21 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
|
||||
)
|
||||
}
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
logRequest(txId, callerIdentity, requestSignature)
|
||||
val conflict = commitStates(states, txId)
|
||||
if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict))
|
||||
override fun commit(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?) {
|
||||
mutex.locked {
|
||||
logRequest(txId, callerIdentity, requestSignature)
|
||||
val conflictingStates = findAlreadyCommitted(states, commitLog)
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
handleConflicts(txId, conflictingStates)
|
||||
} else {
|
||||
handleNoConflicts(timeWindow, states, txId, commitLog)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
@ -122,25 +137,35 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
|
||||
session.persist(request)
|
||||
}
|
||||
|
||||
private fun commitStates(states: List<StateRef>, txId: SecureHash): Map<StateRef, StateConsumptionDetails>? {
|
||||
val conflict = mutex.locked {
|
||||
val conflictingStates = LinkedHashMap<StateRef, SecureHash>()
|
||||
for (inputState in states) {
|
||||
val consumingTx = committedStates[inputState]
|
||||
if (consumingTx != null) conflictingStates[inputState] = consumingTx
|
||||
}
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
log.debug("Failure, input states already committed: ${conflictingStates.keys}")
|
||||
val conflict = conflictingStates.mapValues { (_, txId) -> StateConsumptionDetails(txId.sha256()) }
|
||||
conflict
|
||||
} else {
|
||||
states.forEach { stateRef ->
|
||||
committedStates[stateRef] = txId
|
||||
}
|
||||
log.debug("Successfully committed all input states: $states")
|
||||
null
|
||||
}
|
||||
private fun findAlreadyCommitted(states: List<StateRef>, commitLog: AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef>): LinkedHashMap<StateRef, StateConsumptionDetails> {
|
||||
val conflictingStates = LinkedHashMap<StateRef, StateConsumptionDetails>()
|
||||
for (inputState in states) {
|
||||
val consumingTx = commitLog[inputState]
|
||||
if (consumingTx != null) conflictingStates[inputState] = StateConsumptionDetails(consumingTx.sha256())
|
||||
}
|
||||
return conflictingStates
|
||||
}
|
||||
|
||||
private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
|
||||
if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
|
||||
log.debug { "Transaction $txId already notarised" }
|
||||
return
|
||||
} else {
|
||||
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
|
||||
val conflictError = NotaryError.Conflict(txId, conflictingStates)
|
||||
throw NotaryInternalException(conflictError)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleNoConflicts(timeWindow: TimeWindow?, states: List<StateRef>, txId: SecureHash, commitLog: AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef>) {
|
||||
val outsideTimeWindowError = validateTimeWindow(clock.instant(), timeWindow)
|
||||
if (outsideTimeWindowError == null) {
|
||||
states.forEach { stateRef ->
|
||||
commitLog[stateRef] = txId
|
||||
}
|
||||
log.debug { "Successfully committed all input states: $states" }
|
||||
} else {
|
||||
throw NotaryInternalException(outsideTimeWindowError)
|
||||
}
|
||||
return conflict
|
||||
}
|
||||
}
|
||||
|
@ -22,16 +22,25 @@ import io.atomix.copycat.server.StateMachine
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.validateTimeWindow
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SerializationFactory
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.encoded
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.parseStateRef
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding
|
||||
import java.time.Clock
|
||||
|
||||
/**
|
||||
@ -41,8 +50,8 @@ import java.time.Clock
|
||||
* State re-synchronisation is achieved by replaying the command log to the new (or re-joining) cluster member.
|
||||
*/
|
||||
class RaftTransactionCommitLog<E, EK>(
|
||||
val db: CordaPersistence,
|
||||
val nodeClock: Clock,
|
||||
private val db: CordaPersistence,
|
||||
private val nodeClock: Clock,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, E, EK>
|
||||
) : StateMachine(), Snapshottable {
|
||||
object Commands {
|
||||
@ -50,8 +59,9 @@ class RaftTransactionCommitLog<E, EK>(
|
||||
val states: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val requestingParty: String,
|
||||
val requestSignature: ByteArray
|
||||
) : Command<Map<StateRef, SecureHash>> {
|
||||
val requestSignature: ByteArray,
|
||||
val timeWindow: TimeWindow? = null
|
||||
) : Command<NotaryError?> {
|
||||
override fun compaction(): Command.CompactionMode {
|
||||
// The FULL compaction mode retains the command in the log until it has been stored and applied on all
|
||||
// servers in the cluster. Once the commit has been applied to a state machine and closed it may be
|
||||
@ -72,25 +82,38 @@ class RaftTransactionCommitLog<E, EK>(
|
||||
private val map = db.transaction { createMap() }
|
||||
|
||||
/** Commits the input states for the transaction as specified in the given [Commands.CommitTransaction]. */
|
||||
fun commitTransaction(raftCommit: Commit<Commands.CommitTransaction>): Map<StateRef, SecureHash> {
|
||||
fun commitTransaction(raftCommit: Commit<Commands.CommitTransaction>): NotaryError? {
|
||||
raftCommit.use {
|
||||
val index = it.index()
|
||||
val conflicts = LinkedHashMap<StateRef, SecureHash>()
|
||||
db.transaction {
|
||||
return db.transaction {
|
||||
val commitCommand = raftCommit.command()
|
||||
logRequest(commitCommand)
|
||||
val states = commitCommand.states
|
||||
val txId = commitCommand.txId
|
||||
log.debug("State machine commit: storing entries with keys (${states.joinToString()})")
|
||||
val conflictingStates = LinkedHashMap<StateRef, StateConsumptionDetails>()
|
||||
for (state in states) {
|
||||
map[state]?.let { conflicts[state] = it.second }
|
||||
map[state]?.let { conflictingStates[state] = StateConsumptionDetails(it.second.sha256()) }
|
||||
}
|
||||
if (conflicts.isEmpty()) {
|
||||
val entries = states.map { it to Pair(index, txId) }.toMap()
|
||||
map.putAll(entries)
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
if (isConsumedByTheSameTx(commitCommand.txId.sha256(), conflictingStates)) {
|
||||
null
|
||||
} else {
|
||||
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
|
||||
NotaryError.Conflict(txId, conflictingStates)
|
||||
}
|
||||
} else {
|
||||
val outsideTimeWindowError = validateTimeWindow(clock.instant(), commitCommand.timeWindow)
|
||||
if (outsideTimeWindowError == null) {
|
||||
val entries = states.map { it to Pair(index, txId) }.toMap()
|
||||
map.putAll(entries)
|
||||
log.debug { "Successfully committed all input states: $states" }
|
||||
null
|
||||
} else {
|
||||
outsideTimeWindowError
|
||||
}
|
||||
}
|
||||
}
|
||||
return conflicts
|
||||
}
|
||||
}
|
||||
|
||||
@ -169,103 +192,35 @@ class RaftTransactionCommitLog<E, EK>(
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
// Add custom serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide:
|
||||
@VisibleForTesting
|
||||
val serializer: Serializer by lazy {
|
||||
Serializer().apply {
|
||||
register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java) {
|
||||
object : TypeSerializer<Commands.CommitTransaction> {
|
||||
override fun write(obj: RaftTransactionCommitLog.Commands.CommitTransaction,
|
||||
buffer: BufferOutput<out BufferOutput<*>>,
|
||||
serializer: Serializer) {
|
||||
buffer.writeUnsignedShort(obj.states.size)
|
||||
with(serializer) {
|
||||
obj.states.forEach {
|
||||
writeObject(it, buffer)
|
||||
}
|
||||
writeObject(obj.txId, buffer)
|
||||
}
|
||||
buffer.writeString(obj.requestingParty)
|
||||
buffer.writeInt(obj.requestSignature.size)
|
||||
buffer.write(obj.requestSignature)
|
||||
}
|
||||
registerAbstract(SecureHash::class.java, CordaKryoSerializer::class.java)
|
||||
registerAbstract(TimeWindow::class.java, CordaKryoSerializer::class.java)
|
||||
registerAbstract(NotaryError::class.java, CordaKryoSerializer::class.java)
|
||||
register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java, CordaKryoSerializer::class.java)
|
||||
register(RaftTransactionCommitLog.Commands.Get::class.java, CordaKryoSerializer::class.java)
|
||||
register(StateRef::class.java, CordaKryoSerializer::class.java)
|
||||
register(LinkedHashMap::class.java, CordaKryoSerializer::class.java)
|
||||
}
|
||||
}
|
||||
|
||||
override fun read(type: Class<RaftTransactionCommitLog.Commands.CommitTransaction>,
|
||||
buffer: BufferInput<out BufferInput<*>>,
|
||||
serializer: Serializer): RaftTransactionCommitLog.Commands.CommitTransaction {
|
||||
val stateCount = buffer.readUnsignedShort()
|
||||
val states = (1..stateCount).map {
|
||||
serializer.readObject<StateRef>(buffer)
|
||||
}
|
||||
val txId = serializer.readObject<SecureHash>(buffer)
|
||||
val name = buffer.readString()
|
||||
val signatureSize = buffer.readInt()
|
||||
val signature = ByteArray(signatureSize)
|
||||
buffer.read(signature)
|
||||
return RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, name, signature)
|
||||
}
|
||||
}
|
||||
}
|
||||
register(RaftTransactionCommitLog.Commands.Get::class.java) {
|
||||
object : TypeSerializer<Commands.Get> {
|
||||
override fun write(obj: RaftTransactionCommitLog.Commands.Get, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
serializer.writeObject(obj.key, buffer)
|
||||
}
|
||||
class CordaKryoSerializer<T : Any> : TypeSerializer<T> {
|
||||
private val context = SerializationDefaults.CHECKPOINT_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)
|
||||
private val factory = SerializationFactory.defaultFactory
|
||||
|
||||
override fun read(type: Class<RaftTransactionCommitLog.Commands.Get>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): RaftTransactionCommitLog.Commands.Get {
|
||||
val key = serializer.readObject<StateRef>(buffer)
|
||||
return RaftTransactionCommitLog.Commands.Get(key)
|
||||
}
|
||||
override fun write(obj: T, buffer: BufferOutput<*>, serializer: Serializer) {
|
||||
val serialized = obj.serialize(context = context)
|
||||
buffer.writeInt(serialized.size)
|
||||
buffer.write(serialized.bytes)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
register(StateRef::class.java) {
|
||||
object : TypeSerializer<StateRef> {
|
||||
override fun write(obj: StateRef, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeString(obj.encoded())
|
||||
}
|
||||
|
||||
override fun read(type: Class<StateRef>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): StateRef {
|
||||
return buffer.readString().parseStateRef()
|
||||
}
|
||||
}
|
||||
}
|
||||
registerAbstract(SecureHash::class.java) {
|
||||
object : TypeSerializer<SecureHash> {
|
||||
override fun write(obj: SecureHash, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeUnsignedShort(obj.bytes.size)
|
||||
buffer.write(obj.bytes)
|
||||
}
|
||||
|
||||
override fun read(type: Class<SecureHash>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): SecureHash {
|
||||
val size = buffer.readUnsignedShort()
|
||||
val bytes = ByteArray(size)
|
||||
buffer.read(bytes)
|
||||
return SecureHash.SHA256(bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
register(LinkedHashMap::class.java) {
|
||||
object : TypeSerializer<LinkedHashMap<*, *>> {
|
||||
override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeInt(obj.size)
|
||||
obj.forEach {
|
||||
with(serializer) {
|
||||
writeObject(it.key, buffer)
|
||||
writeObject(it.value, buffer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun read(type: Class<LinkedHashMap<*, *>>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): LinkedHashMap<*, *> {
|
||||
return LinkedHashMap<Any, Any>().apply {
|
||||
repeat(buffer.readInt()) {
|
||||
put(serializer.readObject(buffer), serializer.readObject(buffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
override fun read(type: Class<T>, buffer: BufferInput<*>, serializer: Serializer): T {
|
||||
val size = buffer.readInt()
|
||||
val serialized = ByteArray(size)
|
||||
buffer.read(serialized)
|
||||
return factory.deserialize(ByteSequence.of(serialized), type, context)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -24,19 +24,19 @@ import io.atomix.copycat.server.cluster.Member
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.config.RaftConfig
|
||||
import net.corda.node.services.transactions.RaftTransactionCommitLog.Commands.CommitTransaction
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
@ -197,22 +197,23 @@ class RaftUniquenessProvider(
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
log.debug("Attempting to commit input states: ${states.joinToString()}")
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(
|
||||
override fun commit(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?) {
|
||||
log.debug { "Attempting to commit input states: ${states.joinToString()}" }
|
||||
val commitCommand = CommitTransaction(
|
||||
states,
|
||||
txId,
|
||||
callerIdentity.name.toString(),
|
||||
requestSignature.serialize().bytes
|
||||
requestSignature.serialize().bytes,
|
||||
timeWindow
|
||||
)
|
||||
val conflicts = client.submit(commitCommand).get()
|
||||
if (conflicts.isNotEmpty()) {
|
||||
val conflictingStates = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) }
|
||||
val error = NotaryError.Conflict(txId, conflictingStates)
|
||||
throw NotaryInternalException(error)
|
||||
}
|
||||
log.debug("All input states of transaction $txId have been committed")
|
||||
val commitError = client.submit(commitCommand).get()
|
||||
if (commitError != null) throw NotaryInternalException(commitError)
|
||||
log.debug { "All input states of transaction $txId have been committed" }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,15 +75,17 @@ class PersistentUniquenessProviderTests {
|
||||
val inputState = generateStateRef()
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
provider.commit(inputs, txID, identity, requestSignature)
|
||||
val firstTxId = txID
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, txID, identity, requestSignature)
|
||||
provider.commit(inputs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, txID.sha256())
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,9 @@ import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.internal.concurrent.asCordaFuture
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
@ -32,14 +34,14 @@ import net.corda.testing.core.freeLocalHostAndPort
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.hamcrest.Matchers.instanceOf
|
||||
import org.junit.*
|
||||
import org.junit.Assert.assertThat
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class RaftTransactionCommitLogTests {
|
||||
data class Member(val client: CopycatClient, val server: CopycatServer)
|
||||
@ -76,8 +78,8 @@ class RaftTransactionCommitLogTests {
|
||||
val requestSignature = ByteArray(1024)
|
||||
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature)
|
||||
val conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
val commitError = client.submit(commitCommand).getOrThrow()
|
||||
assertNull(commitError)
|
||||
|
||||
val value1 = client.submit(RaftTransactionCommitLog.Commands.Get(states[0]))
|
||||
val value2 = client.submit(RaftTransactionCommitLog.Commands.Get(states[1]))
|
||||
@ -91,17 +93,60 @@ class RaftTransactionCommitLogTests {
|
||||
val client = cluster.last().client
|
||||
|
||||
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
|
||||
val txId: SecureHash = SecureHash.randomSHA256()
|
||||
val txIdFirst = SecureHash.randomSHA256()
|
||||
val txIdSecond = SecureHash.randomSHA256()
|
||||
val requestingPartyName = ALICE_NAME
|
||||
val requestSignature = ByteArray(1024)
|
||||
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature)
|
||||
var conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
val commitCommandFirst = RaftTransactionCommitLog.Commands.CommitTransaction(states, txIdFirst, requestingPartyName.toString(), requestSignature)
|
||||
var commitError = client.submit(commitCommandFirst).getOrThrow()
|
||||
assertNull(commitError)
|
||||
|
||||
conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertEquals(conflict.keys, states.toSet())
|
||||
conflict.forEach { assertEquals(it.value, txId) }
|
||||
val commitCommandSecond = RaftTransactionCommitLog.Commands.CommitTransaction(states, txIdSecond, requestingPartyName.toString(), requestSignature)
|
||||
commitError = client.submit(commitCommandSecond).getOrThrow()
|
||||
val conflict = commitError as NotaryError.Conflict
|
||||
assertEquals(states.toSet(), conflict.consumedStates.keys)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transactions outside their time window are rejected`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
|
||||
val txId: SecureHash = SecureHash.randomSHA256()
|
||||
val requestingPartyName = ALICE_NAME
|
||||
val requestSignature = ByteArray(1024)
|
||||
val timeWindow = TimeWindow.fromOnly(Instant.MAX)
|
||||
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(
|
||||
states, txId, requestingPartyName.toString(), requestSignature, timeWindow
|
||||
)
|
||||
val commitError = client.submit(commitCommand).getOrThrow()
|
||||
assertThat(commitError, instanceOf(NotaryError.TimeWindowInvalid::class.java))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transactions can be re-notarised outside their time window`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
|
||||
val txId: SecureHash = SecureHash.randomSHA256()
|
||||
val requestingPartyName = ALICE_NAME
|
||||
val requestSignature = ByteArray(1024)
|
||||
val timeWindow = TimeWindow.fromOnly(Instant.MIN)
|
||||
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(
|
||||
states, txId, requestingPartyName.toString(), requestSignature, timeWindow
|
||||
)
|
||||
val commitError = client.submit(commitCommand).getOrThrow()
|
||||
assertNull(commitError)
|
||||
|
||||
val expiredTimeWindow = TimeWindow.untilOnly(Instant.MIN)
|
||||
val commitCommand2 = RaftTransactionCommitLog.Commands.CommitTransaction(
|
||||
states, txId, requestingPartyName.toString(), requestSignature, expiredTimeWindow
|
||||
)
|
||||
val commitError2 = client.submit(commitCommand2).getOrThrow()
|
||||
assertNull(commitError2)
|
||||
}
|
||||
|
||||
private fun setUpCluster(nodeCount: Int = 3): List<Member> {
|
||||
|
@ -35,11 +35,13 @@ import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.TestClock
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
@ -136,6 +138,30 @@ class ValidatingNotaryServiceTests {
|
||||
signatures.forEach { it.verify(stx.id) }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should re-sign a transaction with an expired time-window`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
.setTimeWindow(Instant.now(), 30.seconds)
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val sig1 = runNotaryClient(stx).getOrThrow().single()
|
||||
assertEquals(sig1.by, notary.owningKey)
|
||||
assertTrue(sig1.isValid(stx.id))
|
||||
|
||||
mockNet.nodes.forEach {
|
||||
val nodeClock = (it.started!!.services.clock as TestClock)
|
||||
nodeClock.advanceBy(Duration.ofDays(1))
|
||||
}
|
||||
|
||||
val sig2 = runNotaryClient(stx).getOrThrow().single()
|
||||
assertEquals(sig2.by, notary.owningKey)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report error for transaction with an invalid time-window`() {
|
||||
val stx = run {
|
||||
|
@ -58,7 +58,7 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
|
||||
import java.lang.reflect.Method
|
||||
import java.nio.file.Path
|
||||
@ -155,11 +155,11 @@ fun <A> rpcDriver(
|
||||
private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 {
|
||||
override fun validateUser(user: String?, password: String?) = isValid(user, password)
|
||||
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?) = isValid(user, password)
|
||||
override fun validateUser(user: String?, password: String?, connection: Connection?): String? {
|
||||
override fun validateUser(user: String?, password: String?, connection: RemotingConnection?): String? {
|
||||
return validate(user, password)
|
||||
}
|
||||
|
||||
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?, address: String?, connection: Connection?): String? {
|
||||
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?, address: String?, connection: RemotingConnection?): String? {
|
||||
return validate(user, password)
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user