mirror of
https://github.com/corda/corda.git
synced 2025-01-05 04:34:14 +00:00
Address Rick's comments
This commit is contained in:
parent
fcee4ed7cb
commit
673f02d635
@ -5,9 +5,9 @@ import net.corda.core.utilities.ByteSequence
|
|||||||
import net.corda.node.services.messaging.MessageIdentifier
|
import net.corda.node.services.messaging.MessageIdentifier
|
||||||
import net.corda.node.services.messaging.P2PMessageDeduplicator
|
import net.corda.node.services.messaging.P2PMessageDeduplicator
|
||||||
import net.corda.node.services.messaging.ReceivedMessage
|
import net.corda.node.services.messaging.ReceivedMessage
|
||||||
import net.corda.node.services.messaging.generateShardId
|
|
||||||
import net.corda.node.services.statemachine.MessageType
|
import net.corda.node.services.statemachine.MessageType
|
||||||
import net.corda.node.services.statemachine.SessionId
|
import net.corda.node.services.statemachine.SessionId
|
||||||
|
import net.corda.node.services.statemachine.sharding.ShardIdGenerator
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||||
import net.corda.testing.core.SerializationEnvironmentRule
|
import net.corda.testing.core.SerializationEnvironmentRule
|
||||||
@ -29,7 +29,7 @@ class P2PMessageDeduplicatorTest {
|
|||||||
companion object {
|
companion object {
|
||||||
private const val TOPIC = "whatever"
|
private const val TOPIC = "whatever"
|
||||||
private val DATA = ByteSequence.of("blah blah blah".toByteArray())
|
private val DATA = ByteSequence.of("blah blah blah".toByteArray())
|
||||||
private val SHARD_ID = generateShardId("some-flow-id")
|
private val SHARD_ID = ShardIdGenerator.generateShardId("some-flow-id")
|
||||||
private val SESSION_ID = SessionId(BigInteger.ONE)
|
private val SESSION_ID = SessionId(BigInteger.ONE)
|
||||||
private val TIMESTAMP = Instant.now()
|
private val TIMESTAMP = Instant.now()
|
||||||
private val SENDER = CordaX500Name("CordaWorld", "The Sea Devil", "NeverLand", "NL")
|
private val SENDER = CordaX500Name("CordaWorld", "The Sea Devil", "NeverLand", "NL")
|
||||||
|
@ -1,10 +1,7 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.node.services.messaging
|
||||||
|
|
||||||
import net.corda.core.crypto.SecureHash
|
|
||||||
import net.corda.node.services.messaging.MessageIdentifier.Companion.SHARD_SIZE_IN_CHARS
|
|
||||||
import net.corda.node.services.statemachine.MessageType
|
import net.corda.node.services.statemachine.MessageType
|
||||||
import net.corda.node.services.statemachine.SessionId
|
import net.corda.node.services.statemachine.SessionId
|
||||||
import java.lang.IllegalStateException
|
|
||||||
import java.math.BigInteger
|
import java.math.BigInteger
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
@ -39,40 +36,17 @@ data class MessageIdentifier(
|
|||||||
|
|
||||||
fun parse(id: String): MessageIdentifier {
|
fun parse(id: String): MessageIdentifier {
|
||||||
val prefix = id.substring(0, 2)
|
val prefix = id.substring(0, 2)
|
||||||
val messageType = prefixToMessageType(prefix)
|
val messageType = MessageType.fromPrefix(prefix)
|
||||||
val timestamp = java.lang.Long.parseUnsignedLong(id.substring(3, 19), HEX_RADIX)
|
val timestamp = java.lang.Long.parseUnsignedLong(id.substring(3, 19), HEX_RADIX)
|
||||||
val shardIdentifier = id.substring(20, 28)
|
val shardIdentifier = id.substring(20, 28)
|
||||||
val sessionId = BigInteger(id.substring(29, 61), HEX_RADIX)
|
val sessionId = BigInteger(id.substring(29, 61), HEX_RADIX)
|
||||||
val sessionSequenceNumber = Integer.parseInt(id.substring(62), HEX_RADIX)
|
val sessionSequenceNumber = Integer.parseInt(id.substring(62), HEX_RADIX)
|
||||||
return MessageIdentifier(messageType, shardIdentifier, SessionId(sessionId), sessionSequenceNumber, Instant.ofEpochMilli(timestamp))
|
return MessageIdentifier(messageType, shardIdentifier, SessionId(sessionId), sessionSequenceNumber, Instant.ofEpochMilli(timestamp))
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun messageTypeToPrefix(messageType: MessageType): String {
|
|
||||||
return when(messageType) {
|
|
||||||
MessageType.SESSION_INIT -> "XI"
|
|
||||||
MessageType.SESSION_CONFIRM -> "XC"
|
|
||||||
MessageType.SESSION_REJECT -> "XR"
|
|
||||||
MessageType.DATA_MESSAGE -> "XD"
|
|
||||||
MessageType.SESSION_END -> "XE"
|
|
||||||
MessageType.SESSION_ERROR -> "XX"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun prefixToMessageType(prefix: String): MessageType {
|
|
||||||
return when(prefix) {
|
|
||||||
"XI" -> MessageType.SESSION_INIT
|
|
||||||
"XC" -> MessageType.SESSION_CONFIRM
|
|
||||||
"XR" -> MessageType.SESSION_REJECT
|
|
||||||
"XD" -> MessageType.DATA_MESSAGE
|
|
||||||
"XE" -> MessageType.SESSION_END
|
|
||||||
"XX" -> MessageType.SESSION_ERROR
|
|
||||||
else -> throw IllegalStateException("Invalid prefix: $prefix")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun toString(): String {
|
override fun toString(): String {
|
||||||
val prefix = messageTypeToPrefix(messageType)
|
val prefix = messageType.prefix
|
||||||
val encodedSessionIdentifier = String.format("%1$0${SESSION_ID_SIZE_IN_HEX}X", sessionIdentifier.value)
|
val encodedSessionIdentifier = String.format("%1$0${SESSION_ID_SIZE_IN_HEX}X", sessionIdentifier.value)
|
||||||
val encodedSequenceNumber = Integer.toHexString(sessionSequenceNumber).toUpperCase()
|
val encodedSequenceNumber = Integer.toHexString(sessionSequenceNumber).toUpperCase()
|
||||||
val encodedTimestamp = String.format("%1$0${LONG_SIZE_IN_HEX}X", timestamp.toEpochMilli())
|
val encodedTimestamp = String.format("%1$0${LONG_SIZE_IN_HEX}X", timestamp.toEpochMilli())
|
||||||
@ -81,10 +55,6 @@ data class MessageIdentifier(
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fun generateShardId(flowIdentifier: String): String {
|
|
||||||
return SecureHash.sha256(flowIdentifier).prefixChars(SHARD_SIZE_IN_CHARS)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A unique identifier for a sender that might be different across restarts.
|
* A unique identifier for a sender that might be different across restarts.
|
||||||
* It is used to help identify when messages are being sent continuously without errors or message are sent after the sender recovered from an error.
|
* It is used to help identify when messages are being sent continuously without errors or message are sent after the sender recovered from an error.
|
||||||
|
@ -94,11 +94,11 @@ interface MessagingService : ServiceLifecycleSupport {
|
|||||||
* Signal that a session has ended to the messaging layer, so that any necessary cleanup is performed.
|
* Signal that a session has ended to the messaging layer, so that any necessary cleanup is performed.
|
||||||
*
|
*
|
||||||
* @param sessionId the identifier of the session that ended.
|
* @param sessionId the identifier of the session that ended.
|
||||||
* @param senderUUID the sender UUID of the last message seen in the session or null if there was no sender UUID in that message.
|
* @param theirSenderUUID the sender UUID last seen from the other side in this session or null if there was no sender UUID seen at all.
|
||||||
* @param senderSequenceNumber the sender sequence number of the last message seen in the session or null if there was no sender sequence number in that message.
|
* @param senderSequenceNumber the last sender sequence number seen from the other side in the session or null if there was no sender sequence number seen at all.
|
||||||
*/
|
*/
|
||||||
@Suspendable
|
@Suspendable
|
||||||
fun sessionEnded(sessionId: SessionId, senderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?)
|
fun sessionEnded(sessionId: SessionId, theirSenderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||||
|
@ -551,8 +551,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun sessionEnded(sessionId: SessionId, senderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) {
|
override fun sessionEnded(sessionId: SessionId, theirSenderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) {
|
||||||
deduplicator.signalSessionEnd(sessionId, senderUUID, senderSequenceNumber)
|
deduplicator.signalSessionEnd(sessionId, theirSenderUUID, senderSequenceNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun resolveTargetToArtemisQueue(address: MessageRecipients): String {
|
override fun resolveTargetToArtemisQueue(address: MessageRecipients): String {
|
||||||
|
@ -4,6 +4,7 @@ import net.corda.core.flows.FlowException
|
|||||||
import net.corda.core.flows.FlowInfo
|
import net.corda.core.flows.FlowInfo
|
||||||
import net.corda.core.serialization.CordaSerializable
|
import net.corda.core.serialization.CordaSerializable
|
||||||
import net.corda.core.serialization.SerializedBytes
|
import net.corda.core.serialization.SerializedBytes
|
||||||
|
import java.lang.IllegalStateException
|
||||||
import java.math.BigInteger
|
import java.math.BigInteger
|
||||||
import java.security.SecureRandom
|
import java.security.SecureRandom
|
||||||
|
|
||||||
@ -136,15 +137,21 @@ data class RejectSessionMessage(val message: String, val errorId: Long) : Existi
|
|||||||
*/
|
*/
|
||||||
object EndSessionMessage : ExistingSessionMessagePayload()
|
object EndSessionMessage : ExistingSessionMessagePayload()
|
||||||
|
|
||||||
enum class MessageType {
|
enum class MessageType(val prefix: String) {
|
||||||
SESSION_INIT,
|
SESSION_INIT("XI"),
|
||||||
SESSION_CONFIRM,
|
SESSION_CONFIRM("XC"),
|
||||||
SESSION_REJECT,
|
SESSION_REJECT("XR"),
|
||||||
DATA_MESSAGE,
|
DATA_MESSAGE("XD"),
|
||||||
SESSION_END,
|
SESSION_END("XE"),
|
||||||
SESSION_ERROR;
|
SESSION_ERROR("XX");
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
private val reverseMap = values().associateBy(MessageType::prefix)
|
||||||
|
|
||||||
|
fun fromPrefix(prefix: String): MessageType {
|
||||||
|
return reverseMap[prefix] ?: throw IllegalStateException("Invalid prefix: $prefix")
|
||||||
|
}
|
||||||
|
|
||||||
fun inferFromMessage(message: SessionMessage): MessageType {
|
fun inferFromMessage(message: SessionMessage): MessageType {
|
||||||
return when (message) {
|
return when (message) {
|
||||||
is InitialSessionMessage -> SESSION_INIT
|
is InitialSessionMessage -> SESSION_INIT
|
||||||
|
@ -0,0 +1,12 @@
|
|||||||
|
package net.corda.node.services.statemachine.sharding
|
||||||
|
|
||||||
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import net.corda.node.services.messaging.MessageIdentifier
|
||||||
|
|
||||||
|
class ShardIdGenerator {
|
||||||
|
companion object {
|
||||||
|
fun generateShardId(flowIdentifier: String): String {
|
||||||
|
return SecureHash.sha256(flowIdentifier).prefixChars(MessageIdentifier.SHARD_SIZE_IN_CHARS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -12,8 +12,8 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.core.utilities.toNonEmptySet
|
import net.corda.core.utilities.toNonEmptySet
|
||||||
import net.corda.node.services.messaging.MessageIdentifier
|
import net.corda.node.services.messaging.MessageIdentifier
|
||||||
import net.corda.node.services.messaging.SenderDeduplicationInfo
|
import net.corda.node.services.messaging.SenderDeduplicationInfo
|
||||||
import net.corda.node.services.messaging.generateShardId
|
|
||||||
import net.corda.node.services.statemachine.*
|
import net.corda.node.services.statemachine.*
|
||||||
|
import net.corda.node.services.statemachine.sharding.ShardIdGenerator
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import kotlin.collections.LinkedHashMap
|
import kotlin.collections.LinkedHashMap
|
||||||
|
|
||||||
@ -184,7 +184,7 @@ class StartedFlowTransition(
|
|||||||
val sinkSessionId = (sessionState as SessionState.Initiated).peerSinkSessionId
|
val sinkSessionId = (sessionState as SessionState.Initiated).peerSinkSessionId
|
||||||
val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage)
|
val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage)
|
||||||
val messageType = MessageType.inferFromMessage(message)
|
val messageType = MessageType.inferFromMessage(message)
|
||||||
val messageIdentifier = MessageIdentifier(messageType, generateShardId(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime)
|
val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generateShardId(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime)
|
||||||
Action.SendExisting(sessionState.peerParty, message, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID))
|
Action.SendExisting(sessionState.peerParty, message, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID))
|
||||||
}
|
}
|
||||||
val signalSessionsEndMap = existingSessionsToRemove.map { (sessionId, _) ->
|
val signalSessionsEndMap = existingSessionsToRemove.map { (sessionId, _) ->
|
||||||
@ -290,7 +290,7 @@ class StartedFlowTransition(
|
|||||||
if (sessionState !is SessionState.Uninitiated) {
|
if (sessionState !is SessionState.Uninitiated) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
val shardId = generateShardId(context.id.toString())
|
val shardId = ShardIdGenerator.generateShardId(context.id.toString())
|
||||||
val counterpartySessionId = sourceSessionId.calculateInitiatedSessionId()
|
val counterpartySessionId = sourceSessionId.calculateInitiatedSessionId()
|
||||||
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null)
|
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null)
|
||||||
val newSessionState = SessionState.Initiating(
|
val newSessionState = SessionState.Initiating(
|
||||||
@ -333,7 +333,7 @@ class StartedFlowTransition(
|
|||||||
|
|
||||||
val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.mapNotNull { (sourceSessionId, sessionState, message) ->
|
val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.mapNotNull { (sourceSessionId, sessionState, message) ->
|
||||||
val uninitiatedSessionState = sessionState as SessionState.Uninitiated
|
val uninitiatedSessionState = sessionState as SessionState.Uninitiated
|
||||||
val shardId = generateShardId(context.id.toString())
|
val shardId = ShardIdGenerator.generateShardId(context.id.toString())
|
||||||
if (sessionState.hasBeenAcknowledged != null) {
|
if (sessionState.hasBeenAcknowledged != null) {
|
||||||
newSessions[sourceSessionId] = SessionState.Initiated(
|
newSessions[sourceSessionId] = SessionState.Initiated(
|
||||||
peerParty = sessionState.hasBeenAcknowledged.first,
|
peerParty = sessionState.hasBeenAcknowledged.first,
|
||||||
|
@ -169,7 +169,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun sessionEnded(sessionId: SessionId, senderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) {
|
override fun sessionEnded(sessionId: SessionId, theirSenderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) {
|
||||||
// nothing to do here.
|
// nothing to do here.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user