Address Dan's feedback

This commit is contained in:
Dimos Raptis 2020-10-01 10:47:34 +01:00
parent d3de729390
commit 40766183a3
5 changed files with 20 additions and 12 deletions

View File

@ -29,7 +29,7 @@ class P2PMessageDeduplicatorTest {
companion object { companion object {
private const val TOPIC = "whatever" private const val TOPIC = "whatever"
private val DATA = ByteSequence.of("blah blah blah".toByteArray()) private val DATA = ByteSequence.of("blah blah blah".toByteArray())
private val SHARD_ID = ShardIdGenerator.generateShardId("some-flow-id") private val SHARD_ID = ShardIdGenerator.generate("some-flow-id")
private val SESSION_ID = SessionId(BigInteger.ONE) private val SESSION_ID = SessionId(BigInteger.ONE)
private val TIMESTAMP = Instant.now() private val TIMESTAMP = Instant.now()
private val SENDER = CordaX500Name("CordaWorld", "The Sea Devil", "NeverLand", "NL") private val SENDER = CordaX500Name("CordaWorld", "The Sea Devil", "NeverLand", "NL")

View File

@ -2,6 +2,7 @@ package net.corda.node.services.messaging
import net.corda.node.services.statemachine.MessageType import net.corda.node.services.statemachine.MessageType
import net.corda.node.services.statemachine.SessionId import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.sharding.ShardId
import java.time.Instant import java.time.Instant
/** /**
@ -9,8 +10,7 @@ import java.time.Instant
* It's composed of multiple segments. * It's composed of multiple segments.
* *
* @property messageType the type of the message. * @property messageType the type of the message.
* @property shardIdentifier an identifier that can be used to partition messages into groups for sharding purposes. * @property shardIdentifier the shard identifier of the message.
* This is supposed to have the same value for messages that correspond to the same business-level flow. It is
* @property sessionIdentifier the identifier of the session this message belongs to. This corresponds to the identifier of the session on the receiving side. * @property sessionIdentifier the identifier of the session this message belongs to. This corresponds to the identifier of the session on the receiving side.
* @property sessionSequenceNumber the sequence number of the message inside the session. This can be used to handle out-of-order delivery. * @property sessionSequenceNumber the sequence number of the message inside the session. This can be used to handle out-of-order delivery.
* @property timestamp the time when the message was requested to be sent. * @property timestamp the time when the message was requested to be sent.
@ -18,7 +18,7 @@ import java.time.Instant
*/ */
data class MessageIdentifier( data class MessageIdentifier(
val messageType: MessageType, val messageType: MessageType,
val shardIdentifier: String, val shardIdentifier: ShardId,
val sessionIdentifier: SessionId, val sessionIdentifier: SessionId,
val sessionSequenceNumber: Int, val sessionSequenceNumber: Int,
val timestamp: Instant val timestamp: Instant

View File

@ -25,6 +25,7 @@ import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.MessageIdentifier import net.corda.node.services.messaging.MessageIdentifier
import net.corda.node.services.messaging.SenderSequenceNumber import net.corda.node.services.messaging.SenderSequenceNumber
import net.corda.node.services.messaging.SenderUUID import net.corda.node.services.messaging.SenderUUID
import net.corda.node.services.statemachine.sharding.ShardId
import java.lang.IllegalArgumentException import java.lang.IllegalArgumentException
import java.lang.IllegalStateException import java.lang.IllegalStateException
import java.security.Principal import java.security.Principal
@ -343,7 +344,7 @@ sealed class SessionState {
val bufferedMessages: List<Pair<MessageIdentifier, ExistingSessionMessagePayload>>, val bufferedMessages: List<Pair<MessageIdentifier, ExistingSessionMessagePayload>>,
val rejectionError: FlowError?, val rejectionError: FlowError?,
val nextSendingSeqNumber: Int, val nextSendingSeqNumber: Int,
val shardId: String, val shardId: ShardId,
override val receivedMessages: Map<Int, ExistingSessionMessagePayload>, override val receivedMessages: Map<Int, ExistingSessionMessagePayload>,
override val lastSenderUUID: String?, override val lastSenderUUID: String?,
override val lastSenderSeqNo: Long? override val lastSenderSeqNo: Long?
@ -379,7 +380,7 @@ sealed class SessionState {
val peerSinkSessionId: SessionId, val peerSinkSessionId: SessionId,
val nextSendingSeqNumber: Int, val nextSendingSeqNumber: Int,
val lastProcessedSeqNumber: Int, val lastProcessedSeqNumber: Int,
val shardId: String, val shardId: ShardId,
override val receivedMessages: Map<Int, ExistingSessionMessagePayload>, override val receivedMessages: Map<Int, ExistingSessionMessagePayload>,
override val lastSenderUUID: String?, override val lastSenderUUID: String?,
override val lastSenderSeqNo: Long? override val lastSenderSeqNo: Long?
@ -444,7 +445,7 @@ sealed class FlowStart {
val initiatingMessage: InitialSessionMessage, val initiatingMessage: InitialSessionMessage,
val senderCoreFlowVersion: Int?, val senderCoreFlowVersion: Int?,
val initiatedFlowInfo: FlowInfo, val initiatedFlowInfo: FlowInfo,
val shardIdentifier: String, val shardIdentifier: ShardId,
val senderUUID: String?, val senderUUID: String?,
val senderSequenceNumber: Long? val senderSequenceNumber: Long?
) : FlowStart() { override fun toString() = "Initiated" } ) : FlowStart() { override fun toString() = "Initiated" }

View File

@ -5,8 +5,15 @@ import net.corda.node.services.messaging.MessageIdentifier
class ShardIdGenerator { class ShardIdGenerator {
companion object { companion object {
fun generateShardId(flowIdentifier: String): String { fun generate(flowIdentifier: String): ShardId {
return SecureHash.sha256(flowIdentifier).prefixChars(MessageIdentifier.SHARD_SIZE_IN_CHARS) return SecureHash.sha256(flowIdentifier).prefixChars(MessageIdentifier.SHARD_SIZE_IN_CHARS)
} }
} }
} }
/**
* This is an identifier that can be used to partition messages into groups for sharding purposes.
* It is supposed to have the same value for messages that correspond to the same business-level flow.
*
*/
typealias ShardId = String

View File

@ -184,7 +184,7 @@ class StartedFlowTransition(
val sinkSessionId = (sessionState as SessionState.Initiated).peerSinkSessionId val sinkSessionId = (sessionState as SessionState.Initiated).peerSinkSessionId
val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage) val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage)
val messageType = MessageType.inferFromMessage(message) val messageType = MessageType.inferFromMessage(message)
val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generateShardId(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime) val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generate(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime)
Action.SendExisting(sessionState.peerParty, message, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID)) Action.SendExisting(sessionState.peerParty, message, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID))
} }
val signalSessionsEndMap = existingSessionsToRemove.map { (sessionId, _) -> val signalSessionsEndMap = existingSessionsToRemove.map { (sessionId, _) ->
@ -290,7 +290,7 @@ class StartedFlowTransition(
if (sessionState !is SessionState.Uninitiated) { if (sessionState !is SessionState.Uninitiated) {
continue continue
} }
val shardId = ShardIdGenerator.generateShardId(context.id.toString()) val shardId = ShardIdGenerator.generate(context.id.toString())
val counterpartySessionId = sourceSessionId.calculateInitiatedSessionId() val counterpartySessionId = sourceSessionId.calculateInitiatedSessionId()
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null) val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null)
val newSessionState = SessionState.Initiating( val newSessionState = SessionState.Initiating(
@ -333,7 +333,7 @@ class StartedFlowTransition(
val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.mapNotNull { (sourceSessionId, sessionState, message) -> val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.mapNotNull { (sourceSessionId, sessionState, message) ->
val uninitiatedSessionState = sessionState as SessionState.Uninitiated val uninitiatedSessionState = sessionState as SessionState.Uninitiated
val shardId = ShardIdGenerator.generateShardId(context.id.toString()) val shardId = ShardIdGenerator.generate(context.id.toString())
if (sessionState.hasBeenAcknowledged != null) { if (sessionState.hasBeenAcknowledged != null) {
newSessions[sourceSessionId] = SessionState.Initiated( newSessions[sourceSessionId] = SessionState.Initiated(
peerParty = sessionState.hasBeenAcknowledged.first, peerParty = sessionState.hasBeenAcknowledged.first,