ENT-1540: Make sure transactions with "expired" time windows get re-notarised correctly (#3004)

* ENT-1540: Make sure transactions with "expired" time windows get re-notarised correctly.

Currently the time window is checked before states are being passed to a uniqueness provider. If the time window is
invalid, the transaction will be rejected even if it has already been notarised, which violated idempotency.

For this reason the time window verification was moved alongside state conflict checks.

* Update API - this only affects custom notary interfaces
This commit is contained in:
Andrius Dagys 2018-04-27 15:02:09 +01:00 committed by GitHub
parent 6b78ee8c14
commit efd203e5f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 451 additions and 257 deletions

View File

@ -2112,7 +2112,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l
##
@net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.node.services.TrustedAuthorityNotaryService extends net.corda.core.node.services.NotaryService
public <init>()
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature)
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature, net.corda.core.contracts.TimeWindow)
@org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog()
@org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker()
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider()
@ -2128,7 +2128,7 @@ public static final class net.corda.core.node.services.TrustedAuthorityNotarySer
@org.jetbrains.annotations.NotNull public final net.corda.core.node.services.UniquenessProvider$Conflict getError()
##
public interface net.corda.core.node.services.UniquenessProvider
public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature)
public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature, net.corda.core.contracts.TimeWindow)
##
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.node.services.UniquenessProvider$Conflict extends java.lang.Object
public <init>(Map)

View File

@ -146,8 +146,8 @@ class NotaryFlow {
try {
val parts = validateRequest(requestPayload)
txId = parts.id
service.validateTimeWindow(parts.timestamp)
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature)
checkNotary(parts.notary)
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp)
signTransactionAndSendResponse(txId)
} catch (e: NotaryInternalException) {
throw NotaryException(e.error, txId)

View File

@ -1,9 +1,14 @@
package net.corda.core.internal
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.flows.NotarisationResponse
import net.corda.core.flows.NotaryError
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
import java.time.Instant
/**
* Checks that there are sufficient signatures to satisfy the notary signing requirement and validates the signatures
@ -13,4 +18,19 @@ fun NotarisationResponse.validateSignatures(txId: SecureHash, notary: Party) {
val signingKeys = signatures.map { it.by }
require(notary.owningKey.isFulfilledBy(signingKeys)) { "Insufficient signatures to fulfill the notary signing requirement for $notary" }
signatures.forEach { it.verify(txId) }
}
/** Checks if the provided states were used as inputs in the specified transaction. */
fun isConsumedByTheSameTx(txIdHash: SecureHash, consumedStates: Map<StateRef, StateConsumptionDetails>): Boolean {
val conflicts = consumedStates.filter { (_, cause) ->
cause.hashOfTransactionId != txIdHash
}
return conflicts.isEmpty()
}
/** Returns [NotaryError.TimeWindowInvalid] if [currentTime] is outside the [timeWindow], and *null* otherwise. */
fun validateTimeWindow(currentTime: Instant, timeWindow: TimeWindow?): NotaryError.TimeWindowInvalid? {
return if (timeWindow != null && currentTime !in timeWindow) {
NotaryError.TimeWindowInvalid(currentTime, timeWindow)
} else null
}

View File

@ -17,6 +17,7 @@ abstract class NotaryService : SingletonSerializeAsToken() {
companion object {
@Deprecated("No longer used")
const val ID_PREFIX = "corda.notary."
@Deprecated("No longer used")
fun constructId(validating: Boolean, raft: Boolean = false, bft: Boolean = false, custom: Boolean = false): String {
require(Booleans.countTrue(raft, bft, custom) <= 1) { "At most one of raft, bft or custom may be true" }
@ -79,9 +80,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
* this method does not throw an exception when input states are present multiple times within the transaction.
*/
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature) {
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) {
try {
uniquenessProvider.commit(inputs, txId, caller, requestSignature)
uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow)
} catch (e: NotaryInternalException) {
if (e.error is NotaryError.Conflict) {
val conflicts = inputs.filterIndexed { _, stateRef ->

View File

@ -2,6 +2,7 @@ package net.corda.core.node.services
import net.corda.core.CordaException
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
@ -15,7 +16,13 @@ import net.corda.core.serialization.CordaSerializable
*/
interface UniquenessProvider {
/** Commits all input states of the given transaction. */
fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature)
fun commit(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow? = null
)
/** Specifies the consuming transaction for every conflicting state. */
@CordaSerializable

View File

@ -143,9 +143,9 @@ dependencies {
compileOnly "co.paralleluniverse:capsule:$capsule_version"
// Java Atomix: RAFT library
compile 'io.atomix.copycat:copycat-client:1.2.3'
compile 'io.atomix.copycat:copycat-server:1.2.3'
compile 'io.atomix.catalyst:catalyst-netty:1.1.2'
compile 'io.atomix.copycat:copycat-client:1.2.8'
compile 'io.atomix.copycat:copycat-server:1.2.8'
compile 'io.atomix.catalyst:catalyst-netty:1.2.1'
// Netty: All of it.
compile "io.netty:netty-all:$netty_version"

View File

@ -5,8 +5,8 @@ import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.sha256
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.*
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
@ -31,87 +31,78 @@ import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.PortAllocation
import net.corda.testing.node.TestClock
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNetwork.MockNode
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import org.hamcrest.Matchers.instanceOf
import org.junit.*
import org.junit.Assert.assertThat
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.ExecutionException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class BFTNotaryServiceTests {
private lateinit var mockNet: InternalMockNetwork
private lateinit var notary: Party
private lateinit var node: StartedNode<MockNode>
companion object {
private lateinit var mockNet: InternalMockNetwork
private lateinit var notary: Party
private lateinit var node: StartedNode<MockNode>
@Before
fun before() {
mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts"))
}
@After
fun stopNodes() {
mockNet.stopNodes()
}
private fun startBftClusterAndNode(clusterSize: Int, exposeRaces: Boolean = false) {
(Paths.get("config") / "currentView").deleteIfExists() // XXX: Make config object warn if this exists?
val replicaIds = (0 until clusterSize)
notary = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
CordaX500Name("BFT", "Zurich", "CH"))
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notary, false))))
val clusterAddresses = replicaIds.map { NetworkHostAndPort("localhost", 11000 + it * 10) }
val nodes = replicaIds.map { replicaId ->
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
val notary = NotaryConfig(validating = false, bftSMaRt = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces))
doReturn(notary).whenever(it).notary
}))
} + mockNet.createUnstartedNode()
// MockNetwork doesn't support BFT clusters, so we create all the nodes we need unstarted, and then install the
// network-parameters in their directories before they're started.
node = nodes.map { node ->
networkParameters.install(mockNet.baseDirectory(node.id))
node.start()
}.last()
}
/** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */
@Test
fun `all replicas start even if there is a new consensus during startup`() {
startBftClusterAndNode(minClusterSize(1), exposeRaces = true) // This true adds a sleep to expose the race.
val f = node.run {
val trivialTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
// Create a new consensus while the redundant replica is sleeping:
services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture
@BeforeClass
@JvmStatic
fun before() {
mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts"))
val clusterSize = minClusterSize(1)
val started = startBftClusterAndNode(clusterSize, mockNet)
notary = started.first
node = started.second
}
@AfterClass
@JvmStatic
fun stopNodes() {
mockNet.stopNodes()
}
fun startBftClusterAndNode(clusterSize: Int, mockNet: InternalMockNetwork, exposeRaces: Boolean = false): Pair<Party, StartedNode<MockNode>> {
(Paths.get("config") / "currentView").deleteIfExists() // XXX: Make config object warn if this exists?
val replicaIds = (0 until clusterSize)
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
CordaX500Name("BFT", "Zurich", "CH"))
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false))))
val clusterAddresses = replicaIds.map { NetworkHostAndPort("localhost", 11000 + it * 10) }
val nodes = replicaIds.map { replicaId ->
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
val notary = NotaryConfig(validating = false, bftSMaRt = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces))
doReturn(notary).whenever(it).notary
}))
} + mockNet.createUnstartedNode()
// MockNetwork doesn't support BFT clusters, so we create all the nodes we need unstarted, and then install the
// network-parameters in their directories before they're started.
val node = nodes.map { node ->
networkParameters.install(mockNet.baseDirectory(node.id))
node.start()
}.last()
return Pair(notaryIdentity, node)
}
mockNet.runNetwork()
f.getOrThrow()
}
@Test
fun `detect double spend 1 faulty`() {
detectDoubleSpend(1)
}
@Test
fun `detect double spend 2 faulty`() {
detectDoubleSpend(2)
}
private fun detectDoubleSpend(faultyReplicas: Int) {
val clusterSize = minClusterSize(faultyReplicas)
startBftClusterAndNode(clusterSize)
fun `detect double spend`() {
node.run {
val issueTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
@ -132,7 +123,7 @@ class BFTNotaryServiceTests {
val successfulIndex = results.mapIndexedNotNull { index, result ->
if (result is Try.Success) {
val signers = result.value.map { it.by }
assertEquals(minCorrectReplicas(clusterSize), signers.size)
assertEquals(minCorrectReplicas(3), signers.size)
signers.forEach {
assertTrue(it in (notary.owningKey as CompositeKey).leafKeys)
}
@ -154,6 +145,63 @@ class BFTNotaryServiceTests {
}
}
@Test
fun `transactions outside their time window are rejected`() {
node.run {
val issueTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
database.transaction {
services.recordTransactions(issueTx)
}
val spendTx = signInitialTransaction(notary) {
addInputState(issueTx.tx.outRef<ContractState>(0))
setTimeWindow(TimeWindow.fromOnly(Instant.MAX))
}
val flow = NotaryFlow.Client(spendTx)
val resultFuture = services.startFlow(flow).resultFuture
mockNet.runNetwork()
val exception = assertFailsWith<ExecutionException> { resultFuture.get() }
assertThat(exception.cause, instanceOf(NotaryException::class.java))
val error = (exception.cause as NotaryException).error
assertThat(error, instanceOf(NotaryError.TimeWindowInvalid::class.java))
}
}
@Test
fun `transactions can be re-notarised outside their time window`() {
node.run {
val issueTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
database.transaction {
services.recordTransactions(issueTx)
}
val spendTx = signInitialTransaction(notary) {
addInputState(issueTx.tx.outRef<ContractState>(0))
setTimeWindow(TimeWindow.untilOnly(Instant.now() + Duration.ofHours(1)))
}
val resultFuture = services.startFlow(NotaryFlow.Client(spendTx)).resultFuture
mockNet.runNetwork()
val signatures = resultFuture.get()
verifySignatures(signatures, spendTx.id)
for (node in mockNet.nodes) {
(node.started!!.services.clock as TestClock).advanceBy(Duration.ofDays(1))
}
val resultFuture2 = services.startFlow(NotaryFlow.Client(spendTx)).resultFuture
mockNet.runNetwork()
val signatures2 = resultFuture2.get()
verifySignatures(signatures2, spendTx.id)
}
}
private fun verifySignatures(signatures: List<TransactionSignature>, txId: SecureHash) {
notary.owningKey.isFulfilledBy(signatures.map { it.by })
signatures.forEach { it.verify(txId) }
}
private fun StartedNode<MockNode>.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction {
return services.signInitialTransaction(
TransactionBuilder(notary).apply {

View File

@ -0,0 +1,59 @@
package net.corda.node.services
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.Party
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.StartedNode
import net.corda.node.services.BFTNotaryServiceTests.Companion.startBftClusterAndNode
import net.corda.node.services.transactions.minClusterSize
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNetwork.MockNode
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
class BFTSMaRtTests {
private lateinit var mockNet: InternalMockNetwork
@Before
fun before() {
mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts"))
}
@After
fun stopNodes() {
mockNet.stopNodes()
}
/** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */
@Test
fun `all replicas start even if there is a new consensus during startup`() {
val clusterSize = minClusterSize(1)
val (notary, node) = startBftClusterAndNode(clusterSize, mockNet, exposeRaces = true) // This true adds a sleep to expose the race.
val f = node.run {
val trivialTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
// Create a new consensus while the redundant replica is sleeping:
services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture
}
mockNet.runNetwork()
f.getOrThrow()
}
private fun StartedNode<MockNode>.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction {
return services.signInitialTransaction(
TransactionBuilder(notary).apply {
addCommand(dummyCommand(services.myInfo.singleIdentity().owningKey))
block()
}
)
}
}

View File

@ -139,9 +139,9 @@ class BFTNonValidatingNotaryService(
val id = transaction.id
val inputs = transaction.inputs
val notary = transaction.notary
if (transaction is FilteredTransaction) NotaryService.validateTimeWindow(services.clock, transaction.timeWindow)
val timeWindow = (transaction as? FilteredTransaction)?.timeWindow
if (notary !in services.myInfo.legalIdentities) throw NotaryInternalException(NotaryError.WrongNotary)
commitInputStates(inputs, id, callerIdentity.name, requestSignature)
commitInputStates(inputs, id, callerIdentity.name, requestSignature, timeWindow)
log.debug { "Inputs committed successfully, signing $id" }
BFTSMaRt.ReplicaResponse.Signature(sign(id))
} catch (e: NotaryInternalException) {

View File

@ -13,12 +13,15 @@ import bftsmart.tom.server.defaultservices.DefaultRecoverable
import bftsmart.tom.server.defaultservices.DefaultReplier
import bftsmart.tom.util.Extractor
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.*
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.declaredField
import net.corda.core.internal.isConsumedByTheSameTx
import net.corda.core.internal.toTypedArray
import net.corda.core.internal.validateTimeWindow
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
@ -216,25 +219,27 @@ object BFTSMaRt {
*/
abstract fun executeCommand(command: ByteArray): ByteArray?
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) {
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) {
log.debug { "Attempting to commit inputs for transaction: $txId" }
val conflicts = mutableMapOf<StateRef, SecureHash>()
services.database.transaction {
logRequest(txId, callerName, requestSignature)
states.forEach { state ->
commitLog[state]?.let { conflicts[state] = it }
val conflictingStates = LinkedHashMap<StateRef, StateConsumptionDetails>()
for (state in states) {
commitLog[state]?.let { conflictingStates[state] = StateConsumptionDetails(it.sha256()) }
}
if (conflicts.isEmpty()) {
log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" }
states.forEach { stateRef ->
commitLog[stateRef] = txId
if (conflictingStates.isNotEmpty()) {
if (!isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
throw NotaryInternalException(NotaryError.Conflict(txId, conflictingStates))
}
} else {
log.debug { "Conflict detected the following inputs have already been committed: ${conflicts.keys.joinToString()}" }
val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) }
val error = NotaryError.Conflict(txId, conflict)
throw NotaryInternalException(error)
val outsideTimeWindowError = validateTimeWindow(services.clock.instant(), timeWindow)
if (outsideTimeWindowError == null) {
states.forEach { commitLog[it] = txId }
log.debug { "Successfully committed all input states: $states" }
} else {
throw NotaryInternalException(outsideTimeWindowError)
}
}
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.transactions
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.flows.NotarisationRequestSignature
@ -9,12 +10,15 @@ import net.corda.core.flows.NotaryInternalException
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.isConsumedByTheSameTx
import net.corda.core.internal.validateTimeWindow
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
@ -64,7 +68,7 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash)
private class InnerState {
val committedStates = createMap()
val commitLog = createMap()
}
private val mutex = ThreadBox(InnerState())
@ -95,10 +99,21 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
)
}
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
logRequest(txId, callerIdentity, requestSignature)
val conflict = commitStates(states, txId)
if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict))
override fun commit(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?) {
mutex.locked {
logRequest(txId, callerIdentity, requestSignature)
val conflictingStates = findAlreadyCommitted(states, commitLog)
if (conflictingStates.isNotEmpty()) {
handleConflicts(txId, conflictingStates)
} else {
handleNoConflicts(timeWindow, states, txId, commitLog)
}
}
}
private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
@ -112,25 +127,35 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
session.persist(request)
}
private fun commitStates(states: List<StateRef>, txId: SecureHash): Map<StateRef, StateConsumptionDetails>? {
val conflict = mutex.locked {
val conflictingStates = LinkedHashMap<StateRef, SecureHash>()
for (inputState in states) {
val consumingTx = committedStates[inputState]
if (consumingTx != null) conflictingStates[inputState] = consumingTx
}
if (conflictingStates.isNotEmpty()) {
log.debug("Failure, input states already committed: ${conflictingStates.keys}")
val conflict = conflictingStates.mapValues { (_, txId) -> StateConsumptionDetails(txId.sha256()) }
conflict
} else {
states.forEach { stateRef ->
committedStates[stateRef] = txId
}
log.debug("Successfully committed all input states: $states")
null
}
private fun findAlreadyCommitted(states: List<StateRef>, commitLog: AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef>): LinkedHashMap<StateRef, StateConsumptionDetails> {
val conflictingStates = LinkedHashMap<StateRef, StateConsumptionDetails>()
for (inputState in states) {
val consumingTx = commitLog[inputState]
if (consumingTx != null) conflictingStates[inputState] = StateConsumptionDetails(consumingTx.sha256())
}
return conflictingStates
}
private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
log.debug { "Transaction $txId already notarised" }
return
} else {
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
val conflictError = NotaryError.Conflict(txId, conflictingStates)
throw NotaryInternalException(conflictError)
}
}
private fun handleNoConflicts(timeWindow: TimeWindow?, states: List<StateRef>, txId: SecureHash, commitLog: AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef>) {
val outsideTimeWindowError = validateTimeWindow(clock.instant(), timeWindow)
if (outsideTimeWindowError == null) {
states.forEach { stateRef ->
commitLog[stateRef] = txId
}
log.debug { "Successfully committed all input states: $states" }
} else {
throw NotaryInternalException(outsideTimeWindowError)
}
return conflict
}
}

View File

@ -12,16 +12,25 @@ import io.atomix.copycat.server.StateMachine
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.flows.NotaryError
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.isConsumedByTheSameTx
import net.corda.core.internal.validateTimeWindow
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializationFactory
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.contextLogger
import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.encoded
import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.parseStateRef
import net.corda.core.utilities.debug
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding
import java.time.Clock
/**
@ -31,8 +40,8 @@ import java.time.Clock
* State re-synchronisation is achieved by replaying the command log to the new (or re-joining) cluster member.
*/
class RaftTransactionCommitLog<E, EK>(
val db: CordaPersistence,
val nodeClock: Clock,
private val db: CordaPersistence,
private val nodeClock: Clock,
createMap: () -> AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, E, EK>
) : StateMachine(), Snapshottable {
object Commands {
@ -40,8 +49,9 @@ class RaftTransactionCommitLog<E, EK>(
val states: List<StateRef>,
val txId: SecureHash,
val requestingParty: String,
val requestSignature: ByteArray
) : Command<Map<StateRef, SecureHash>> {
val requestSignature: ByteArray,
val timeWindow: TimeWindow? = null
) : Command<NotaryError?> {
override fun compaction(): Command.CompactionMode {
// The FULL compaction mode retains the command in the log until it has been stored and applied on all
// servers in the cluster. Once the commit has been applied to a state machine and closed it may be
@ -62,25 +72,38 @@ class RaftTransactionCommitLog<E, EK>(
private val map = db.transaction { createMap() }
/** Commits the input states for the transaction as specified in the given [Commands.CommitTransaction]. */
fun commitTransaction(raftCommit: Commit<Commands.CommitTransaction>): Map<StateRef, SecureHash> {
fun commitTransaction(raftCommit: Commit<Commands.CommitTransaction>): NotaryError? {
raftCommit.use {
val index = it.index()
val conflicts = LinkedHashMap<StateRef, SecureHash>()
db.transaction {
return db.transaction {
val commitCommand = raftCommit.command()
logRequest(commitCommand)
val states = commitCommand.states
val txId = commitCommand.txId
log.debug("State machine commit: storing entries with keys (${states.joinToString()})")
val conflictingStates = LinkedHashMap<StateRef, StateConsumptionDetails>()
for (state in states) {
map[state]?.let { conflicts[state] = it.second }
map[state]?.let { conflictingStates[state] = StateConsumptionDetails(it.second.sha256()) }
}
if (conflicts.isEmpty()) {
val entries = states.map { it to Pair(index, txId) }.toMap()
map.putAll(entries)
if (conflictingStates.isNotEmpty()) {
if (isConsumedByTheSameTx(commitCommand.txId.sha256(), conflictingStates)) {
null
} else {
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
NotaryError.Conflict(txId, conflictingStates)
}
} else {
val outsideTimeWindowError = validateTimeWindow(clock.instant(), commitCommand.timeWindow)
if (outsideTimeWindowError == null) {
val entries = states.map { it to Pair(index, txId) }.toMap()
map.putAll(entries)
log.debug { "Successfully committed all input states: $states" }
null
} else {
outsideTimeWindowError
}
}
}
return conflicts
}
}
@ -159,103 +182,35 @@ class RaftTransactionCommitLog<E, EK>(
companion object {
private val log = contextLogger()
// Add custom serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide:
@VisibleForTesting
val serializer: Serializer by lazy {
Serializer().apply {
register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java) {
object : TypeSerializer<Commands.CommitTransaction> {
override fun write(obj: RaftTransactionCommitLog.Commands.CommitTransaction,
buffer: BufferOutput<out BufferOutput<*>>,
serializer: Serializer) {
buffer.writeUnsignedShort(obj.states.size)
with(serializer) {
obj.states.forEach {
writeObject(it, buffer)
}
writeObject(obj.txId, buffer)
}
buffer.writeString(obj.requestingParty)
buffer.writeInt(obj.requestSignature.size)
buffer.write(obj.requestSignature)
}
registerAbstract(SecureHash::class.java, CordaKryoSerializer::class.java)
registerAbstract(TimeWindow::class.java, CordaKryoSerializer::class.java)
registerAbstract(NotaryError::class.java, CordaKryoSerializer::class.java)
register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java, CordaKryoSerializer::class.java)
register(RaftTransactionCommitLog.Commands.Get::class.java, CordaKryoSerializer::class.java)
register(StateRef::class.java, CordaKryoSerializer::class.java)
register(LinkedHashMap::class.java, CordaKryoSerializer::class.java)
}
}
override fun read(type: Class<RaftTransactionCommitLog.Commands.CommitTransaction>,
buffer: BufferInput<out BufferInput<*>>,
serializer: Serializer): RaftTransactionCommitLog.Commands.CommitTransaction {
val stateCount = buffer.readUnsignedShort()
val states = (1..stateCount).map {
serializer.readObject<StateRef>(buffer)
}
val txId = serializer.readObject<SecureHash>(buffer)
val name = buffer.readString()
val signatureSize = buffer.readInt()
val signature = ByteArray(signatureSize)
buffer.read(signature)
return RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, name, signature)
}
}
}
register(RaftTransactionCommitLog.Commands.Get::class.java) {
object : TypeSerializer<Commands.Get> {
override fun write(obj: RaftTransactionCommitLog.Commands.Get, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
serializer.writeObject(obj.key, buffer)
}
class CordaKryoSerializer<T : Any> : TypeSerializer<T> {
private val context = SerializationDefaults.CHECKPOINT_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)
private val factory = SerializationFactory.defaultFactory
override fun read(type: Class<RaftTransactionCommitLog.Commands.Get>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): RaftTransactionCommitLog.Commands.Get {
val key = serializer.readObject<StateRef>(buffer)
return RaftTransactionCommitLog.Commands.Get(key)
}
override fun write(obj: T, buffer: BufferOutput<*>, serializer: Serializer) {
val serialized = obj.serialize(context = context)
buffer.writeInt(serialized.size)
buffer.write(serialized.bytes)
}
}
}
register(StateRef::class.java) {
object : TypeSerializer<StateRef> {
override fun write(obj: StateRef, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
buffer.writeString(obj.encoded())
}
override fun read(type: Class<StateRef>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): StateRef {
return buffer.readString().parseStateRef()
}
}
}
registerAbstract(SecureHash::class.java) {
object : TypeSerializer<SecureHash> {
override fun write(obj: SecureHash, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
buffer.writeUnsignedShort(obj.bytes.size)
buffer.write(obj.bytes)
}
override fun read(type: Class<SecureHash>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): SecureHash {
val size = buffer.readUnsignedShort()
val bytes = ByteArray(size)
buffer.read(bytes)
return SecureHash.SHA256(bytes)
}
}
}
register(LinkedHashMap::class.java) {
object : TypeSerializer<LinkedHashMap<*, *>> {
override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
buffer.writeInt(obj.size)
obj.forEach {
with(serializer) {
writeObject(it.key, buffer)
writeObject(it.value, buffer)
}
}
}
override fun read(type: Class<LinkedHashMap<*, *>>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): LinkedHashMap<*, *> {
return LinkedHashMap<Any, Any>().apply {
repeat(buffer.readInt()) {
put(serializer.readObject(buffer), serializer.readObject(buffer))
}
}
}
}
}
override fun read(type: Class<T>, buffer: BufferInput<*>, serializer: Serializer): T {
val size = buffer.readInt()
val serialized = ByteArray(size)
buffer.read(serialized)
return factory.deserialize(ByteSequence.of(serialized), type, context)
}
}
}
}
}

View File

@ -14,19 +14,19 @@ import io.atomix.copycat.server.cluster.Member
import io.atomix.copycat.server.storage.Storage
import io.atomix.copycat.server.storage.StorageLevel
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryInternalException
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.config.RaftConfig
import net.corda.node.services.transactions.RaftTransactionCommitLog.Commands.CommitTransaction
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.config.SSLConfiguration
@ -187,22 +187,23 @@ class RaftUniquenessProvider(
})
}
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
log.debug("Attempting to commit input states: ${states.joinToString()}")
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(
override fun commit(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?) {
log.debug { "Attempting to commit input states: ${states.joinToString()}" }
val commitCommand = CommitTransaction(
states,
txId,
callerIdentity.name.toString(),
requestSignature.serialize().bytes
requestSignature.serialize().bytes,
timeWindow
)
val conflicts = client.submit(commitCommand).get()
if (conflicts.isNotEmpty()) {
val conflictingStates = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) }
val error = NotaryError.Conflict(txId, conflictingStates)
throw NotaryInternalException(error)
}
log.debug("All input states of transaction $txId have been committed")
val commitError = client.submit(commitCommand).get()
if (commitError != null) throw NotaryInternalException(commitError)
log.debug { "All input states of transaction $txId have been committed" }
}
}

View File

@ -65,15 +65,17 @@ class PersistentUniquenessProviderTests {
val inputState = generateStateRef()
val inputs = listOf(inputState)
provider.commit(inputs, txID, identity, requestSignature)
val firstTxId = txID
provider.commit(inputs, firstTxId, identity, requestSignature)
val secondTxId = SecureHash.randomSHA256()
val ex = assertFailsWith<NotaryInternalException> {
provider.commit(inputs, txID, identity, requestSignature)
provider.commit(inputs, secondTxId, identity, requestSignature)
}
val error = ex.error as NotaryError.Conflict
val conflictCause = error.consumedStates[inputState]!!
assertEquals(conflictCause.hashOfTransactionId, txID.sha256())
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
}
}
}

View File

@ -7,7 +7,9 @@ import io.atomix.copycat.server.CopycatServer
import io.atomix.copycat.server.storage.Storage
import io.atomix.copycat.server.storage.StorageLevel
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotaryError
import net.corda.core.internal.concurrent.asCordaFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.NetworkHostAndPort
@ -22,14 +24,14 @@ import net.corda.testing.core.freeLocalHostAndPort
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.hamcrest.Matchers.instanceOf
import org.junit.*
import org.junit.Assert.assertThat
import java.time.Clock
import java.time.Instant
import java.util.concurrent.CompletableFuture
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.test.assertNull
class RaftTransactionCommitLogTests {
data class Member(val client: CopycatClient, val server: CopycatServer)
@ -66,8 +68,8 @@ class RaftTransactionCommitLogTests {
val requestSignature = ByteArray(1024)
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature)
val conflict = client.submit(commitCommand).getOrThrow()
assertTrue { conflict.isEmpty() }
val commitError = client.submit(commitCommand).getOrThrow()
assertNull(commitError)
val value1 = client.submit(RaftTransactionCommitLog.Commands.Get(states[0]))
val value2 = client.submit(RaftTransactionCommitLog.Commands.Get(states[1]))
@ -81,17 +83,60 @@ class RaftTransactionCommitLogTests {
val client = cluster.last().client
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
val txId: SecureHash = SecureHash.randomSHA256()
val txIdFirst = SecureHash.randomSHA256()
val txIdSecond = SecureHash.randomSHA256()
val requestingPartyName = ALICE_NAME
val requestSignature = ByteArray(1024)
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature)
var conflict = client.submit(commitCommand).getOrThrow()
assertTrue { conflict.isEmpty() }
val commitCommandFirst = RaftTransactionCommitLog.Commands.CommitTransaction(states, txIdFirst, requestingPartyName.toString(), requestSignature)
var commitError = client.submit(commitCommandFirst).getOrThrow()
assertNull(commitError)
conflict = client.submit(commitCommand).getOrThrow()
assertEquals(conflict.keys, states.toSet())
conflict.forEach { assertEquals(it.value, txId) }
val commitCommandSecond = RaftTransactionCommitLog.Commands.CommitTransaction(states, txIdSecond, requestingPartyName.toString(), requestSignature)
commitError = client.submit(commitCommandSecond).getOrThrow()
val conflict = commitError as NotaryError.Conflict
assertEquals(states.toSet(), conflict.consumedStates.keys)
}
@Test
fun `transactions outside their time window are rejected`() {
val client = cluster.last().client
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
val txId: SecureHash = SecureHash.randomSHA256()
val requestingPartyName = ALICE_NAME
val requestSignature = ByteArray(1024)
val timeWindow = TimeWindow.fromOnly(Instant.MAX)
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(
states, txId, requestingPartyName.toString(), requestSignature, timeWindow
)
val commitError = client.submit(commitCommand).getOrThrow()
assertThat(commitError, instanceOf(NotaryError.TimeWindowInvalid::class.java))
}
@Test
fun `transactions can be re-notarised outside their time window`() {
val client = cluster.last().client
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
val txId: SecureHash = SecureHash.randomSHA256()
val requestingPartyName = ALICE_NAME
val requestSignature = ByteArray(1024)
val timeWindow = TimeWindow.fromOnly(Instant.MIN)
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(
states, txId, requestingPartyName.toString(), requestSignature, timeWindow
)
val commitError = client.submit(commitCommand).getOrThrow()
assertNull(commitError)
val expiredTimeWindow = TimeWindow.untilOnly(Instant.MIN)
val commitCommand2 = RaftTransactionCommitLog.Commands.CommitTransaction(
states, txId, requestingPartyName.toString(), requestSignature, expiredTimeWindow
)
val commitError2 = client.submit(commitCommand2).getOrThrow()
assertNull(commitError2)
}
private fun setUpCluster(nodeCount: Int = 3): List<Member> {

View File

@ -25,11 +25,13 @@ import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.TestClock
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Duration
import java.time.Instant
import java.util.*
import kotlin.test.assertEquals
@ -126,6 +128,30 @@ class ValidatingNotaryServiceTests {
signatures.forEach { it.verify(stx.id) }
}
@Test
fun `should re-sign a transaction with an expired time-window`() {
val stx = run {
val inputState = issueState(aliceNode.services, alice)
val tx = TransactionBuilder(notary)
.addInputState(inputState)
.addCommand(dummyCommand(alice.owningKey))
.setTimeWindow(Instant.now(), 30.seconds)
aliceNode.services.signInitialTransaction(tx)
}
val sig1 = runNotaryClient(stx).getOrThrow().single()
assertEquals(sig1.by, notary.owningKey)
assertTrue(sig1.isValid(stx.id))
mockNet.nodes.forEach {
val nodeClock = (it.started!!.services.clock as TestClock)
nodeClock.advanceBy(Duration.ofDays(1))
}
val sig2 = runNotaryClient(stx).getOrThrow().single()
assertEquals(sig2.by, notary.owningKey)
}
@Test
fun `should report error for transaction with an invalid time-window`() {
val stx = run {