mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
Address comments
This commit is contained in:
@ -22,7 +22,7 @@ interface CheckpointStorage {
|
||||
|
||||
/**
|
||||
* Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the
|
||||
* underlying database connection is open, so any processing should happen before it is closed.
|
||||
* underlying database connection is closed, so any processing should happen before it is closed.
|
||||
*/
|
||||
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>>
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ interface ReceivedMessage : Message {
|
||||
val peer: CordaX500Name
|
||||
/** Platform version of the sender's node. */
|
||||
val platformVersion: Int
|
||||
/** UUID representing the sending JVM */
|
||||
/** Sequence number of message with respect to senderUUID */
|
||||
val senderSeqNo: Long?
|
||||
/** True if a flow session init message */
|
||||
val isSessionInit: Boolean
|
||||
|
@ -405,7 +405,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
internal fun deliver(artemisMessage: ClientMessage) {
|
||||
|
||||
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
|
||||
if (!deduplicator.isDuplicate(cordaMessage)) {
|
||||
deduplicator.signalMessageProcessStart(cordaMessage)
|
||||
@ -418,7 +417,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
private fun deliver(msg: ReceivedMessage, artemisMessage: ClientMessage) {
|
||||
|
||||
state.checkNotLocked()
|
||||
val deliverTo = handlers[msg.topic]
|
||||
if (deliverTo != null) {
|
||||
@ -600,7 +598,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map<String, String>): Message {
|
||||
|
||||
return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID, additionalHeaders)
|
||||
}
|
||||
|
||||
|
@ -96,7 +96,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
|
||||
|
||||
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
|
||||
return txStorage.locked {
|
||||
DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.wrapWithDatabaseTransaction())
|
||||
}
|
||||
}
|
||||
|
||||
@ -104,7 +104,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
|
||||
return txStorage.locked {
|
||||
val existingTransaction = get(id)
|
||||
if (existingTransaction == null) {
|
||||
updatesPublisher.filter { it.id == id }.toFuture()
|
||||
updates.filter { it.id == id }.toFuture()
|
||||
} else {
|
||||
doneFuture(existingTransaction.toSignedTx())
|
||||
}
|
||||
|
@ -36,7 +36,6 @@ interface FlowMessaging {
|
||||
* Implementation of [FlowMessaging] using a [ServiceHubInternal] to do the messaging and routing.
|
||||
*/
|
||||
class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
|
||||
|
||||
companion object {
|
||||
val log = contextLogger()
|
||||
|
||||
@ -63,7 +62,6 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
|
||||
}
|
||||
|
||||
private fun SessionMessage.additionalHeaders(target: Party): Map<String, String> {
|
||||
|
||||
// This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator.
|
||||
// It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case.
|
||||
val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name }
|
||||
|
@ -89,7 +89,7 @@ class VaultSoftLockManagerTest {
|
||||
return object : VaultServiceInternal by realVault {
|
||||
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
|
||||
// Should be called before flow is removed
|
||||
assertEquals(1, node.smm.allStateMachines.size)
|
||||
assertEquals(1, node.started!!.smm.allStateMachines.size)
|
||||
mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests.
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user