diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt index 5e68ace6dd..724e345245 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt @@ -29,7 +29,7 @@ class P2PMessageDeduplicatorTest { companion object { private const val TOPIC = "whatever" private val DATA = ByteSequence.of("blah blah blah".toByteArray()) - private val SHARD_ID = ShardIdGenerator.generateShardId("some-flow-id") + private val SHARD_ID = ShardIdGenerator.generate("some-flow-id") private val SESSION_ID = SessionId(BigInteger.ONE) private val TIMESTAMP = Instant.now() private val SENDER = CordaX500Name("CordaWorld", "The Sea Devil", "NeverLand", "NL") diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt index 9eed4f1a93..87026af103 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt @@ -2,6 +2,7 @@ package net.corda.node.services.messaging import net.corda.node.services.statemachine.MessageType import net.corda.node.services.statemachine.SessionId +import net.corda.node.services.statemachine.sharding.ShardId import java.time.Instant /** @@ -9,8 +10,7 @@ import java.time.Instant * It's composed of multiple segments. * * @property messageType the type of the message. - * @property shardIdentifier an identifier that can be used to partition messages into groups for sharding purposes. - * This is supposed to have the same value for messages that correspond to the same business-level flow. It is + * @property shardIdentifier the shard identifier of the message. * @property sessionIdentifier the identifier of the session this message belongs to. This corresponds to the identifier of the session on the receiving side. * @property sessionSequenceNumber the sequence number of the message inside the session. This can be used to handle out-of-order delivery. * @property timestamp the time when the message was requested to be sent. @@ -18,7 +18,7 @@ import java.time.Instant */ data class MessageIdentifier( val messageType: MessageType, - val shardIdentifier: String, + val shardIdentifier: ShardId, val sessionIdentifier: SessionId, val sessionSequenceNumber: Int, val timestamp: Instant diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 25d647ec29..37b1afe6b2 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -25,6 +25,7 @@ import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.MessageIdentifier import net.corda.node.services.messaging.SenderSequenceNumber import net.corda.node.services.messaging.SenderUUID +import net.corda.node.services.statemachine.sharding.ShardId import java.lang.IllegalArgumentException import java.lang.IllegalStateException import java.security.Principal @@ -343,7 +344,7 @@ sealed class SessionState { val bufferedMessages: List>, val rejectionError: FlowError?, val nextSendingSeqNumber: Int, - val shardId: String, + val shardId: ShardId, override val receivedMessages: Map, override val lastSenderUUID: String?, override val lastSenderSeqNo: Long? @@ -379,7 +380,7 @@ sealed class SessionState { val peerSinkSessionId: SessionId, val nextSendingSeqNumber: Int, val lastProcessedSeqNumber: Int, - val shardId: String, + val shardId: ShardId, override val receivedMessages: Map, override val lastSenderUUID: String?, override val lastSenderSeqNo: Long? @@ -444,7 +445,7 @@ sealed class FlowStart { val initiatingMessage: InitialSessionMessage, val senderCoreFlowVersion: Int?, val initiatedFlowInfo: FlowInfo, - val shardIdentifier: String, + val shardIdentifier: ShardId, val senderUUID: String?, val senderSequenceNumber: Long? ) : FlowStart() { override fun toString() = "Initiated" } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/sharding/ShardIdGenerator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/sharding/ShardIdGenerator.kt index ccc01663ce..b7b9a232f2 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/sharding/ShardIdGenerator.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/sharding/ShardIdGenerator.kt @@ -5,8 +5,15 @@ import net.corda.node.services.messaging.MessageIdentifier class ShardIdGenerator { companion object { - fun generateShardId(flowIdentifier: String): String { + fun generate(flowIdentifier: String): ShardId { return SecureHash.sha256(flowIdentifier).prefixChars(MessageIdentifier.SHARD_SIZE_IN_CHARS) } } -} \ No newline at end of file +} + +/** + * This is an identifier that can be used to partition messages into groups for sharding purposes. + * It is supposed to have the same value for messages that correspond to the same business-level flow. + * + */ +typealias ShardId = String \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 94b4948e48..8034ac36c1 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -184,7 +184,7 @@ class StartedFlowTransition( val sinkSessionId = (sessionState as SessionState.Initiated).peerSinkSessionId val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage) val messageType = MessageType.inferFromMessage(message) - val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generateShardId(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generate(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime) Action.SendExisting(sessionState.peerParty, message, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID)) } val signalSessionsEndMap = existingSessionsToRemove.map { (sessionId, _) -> @@ -290,7 +290,7 @@ class StartedFlowTransition( if (sessionState !is SessionState.Uninitiated) { continue } - val shardId = ShardIdGenerator.generateShardId(context.id.toString()) + val shardId = ShardIdGenerator.generate(context.id.toString()) val counterpartySessionId = sourceSessionId.calculateInitiatedSessionId() val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null) val newSessionState = SessionState.Initiating( @@ -333,7 +333,7 @@ class StartedFlowTransition( val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.mapNotNull { (sourceSessionId, sessionState, message) -> val uninitiatedSessionState = sessionState as SessionState.Uninitiated - val shardId = ShardIdGenerator.generateShardId(context.id.toString()) + val shardId = ShardIdGenerator.generate(context.id.toString()) if (sessionState.hasBeenAcknowledged != null) { newSessions[sourceSessionId] = SessionState.Initiated( peerParty = sessionState.hasBeenAcknowledged.first,