diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 3e83150ab4..8b9039bbd1 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -1590,9 +1590,9 @@ public final class net.corda.core.flows.TransactionParts extends java.lang.Objec public String toString() ## @net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException implements net.corda.core.flows.IdentifiableException - public (String, Throwable, long) - @org.jetbrains.annotations.NotNull public Long getErrorId() - public final long getOriginalErrorId() + public (String) + public (String, Throwable) + public (String, Throwable, Long) ## @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.identity.AbstractParty extends java.lang.Object public (java.security.PublicKey) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowException.kt b/core/src/main/kotlin/net/corda/core/flows/FlowException.kt index ac0fbdaa23..8dbe892a74 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowException.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowException.kt @@ -36,7 +36,9 @@ open class FlowException(message: String?, cause: Throwable?) : * that we were not expecting), or the other side had an internal error, or the other side terminated when we * were waiting for a response. */ -class UnexpectedFlowEndException(message: String, cause: Throwable?, val originalErrorId: Long) : +class UnexpectedFlowEndException(message: String, cause: Throwable?, val originalErrorId: Long?) : CordaRuntimeException(message, cause), IdentifiableException { - override fun getErrorId(): Long = originalErrorId + constructor(message: String, cause: Throwable?) : this(message, cause, null) + constructor(message: String) : this(message, null) + override fun getErrorId(): Long? = originalErrorId } diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 9eeddc32d8..b5c8a1b399 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -131,7 +131,6 @@ abstract class FlowLogic { * Note: The current implementation returns the single identity of the node. This will change once multiple identities * is implemented. */ - val ourIdentity: Party get() = stateMachine.ourIdentity // Used to implement the deprecated send/receive functions using Party. When such a deprecated function is used we diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index 4a55d7163a..5d0cf99dde 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -22,7 +22,7 @@ interface CheckpointStorage { /** * Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the - * underlying database connection is open, so any processing should happen before it is closed. + * underlying database connection is closed, so any processing should happen before it is closed. */ fun getAllCheckpoints(): Stream>> } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 3e72f52b72..fce17e32f3 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -139,7 +139,7 @@ interface ReceivedMessage : Message { val peer: CordaX500Name /** Platform version of the sender's node. */ val platformVersion: Int - /** UUID representing the sending JVM */ + /** Sequence number of message with respect to senderUUID */ val senderSeqNo: Long? /** True if a flow session init message */ val isSessionInit: Boolean diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 73fe65e66f..189d5db813 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -405,7 +405,6 @@ class P2PMessagingClient(val config: NodeConfiguration, } internal fun deliver(artemisMessage: ClientMessage) { - artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> if (!deduplicator.isDuplicate(cordaMessage)) { deduplicator.signalMessageProcessStart(cordaMessage) @@ -418,7 +417,6 @@ class P2PMessagingClient(val config: NodeConfiguration, } private fun deliver(msg: ReceivedMessage, artemisMessage: ClientMessage) { - state.checkNotLocked() val deliverTo = handlers[msg.topic] if (deliverTo != null) { @@ -600,7 +598,6 @@ class P2PMessagingClient(val config: NodeConfiguration, } override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map): Message { - return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID, additionalHeaders) } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 9e90665ab9..2bd8cac107 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -96,7 +96,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S override fun track(): DataFeed, SignedTransaction> { return txStorage.locked { - DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.wrapWithDatabaseTransaction()) } } @@ -104,7 +104,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S return txStorage.locked { val existingTransaction = get(id) if (existingTransaction == null) { - updatesPublisher.filter { it.id == id }.toFuture() + updates.filter { it.id == id }.toFuture() } else { doneFuture(existingTransaction.toSignedTx()) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt index 01a90a40d1..3ee44d3803 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt @@ -36,7 +36,6 @@ interface FlowMessaging { * Implementation of [FlowMessaging] using a [ServiceHubInternal] to do the messaging and routing. */ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { - companion object { val log = contextLogger() @@ -63,7 +62,6 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { } private fun SessionMessage.additionalHeaders(target: Party): Map { - // This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator. // It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case. val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name } diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt index 41ee9fad51..82c507f9ba 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -89,7 +89,7 @@ class VaultSoftLockManagerTest { return object : VaultServiceInternal by realVault { override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet?) { // Should be called before flow is removed - assertEquals(1, node.smm.allStateMachines.size) + assertEquals(1, node.started!!.smm.allStateMachines.size) mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests. } }