Deprecate deduplicationSeed, initiationEntropy fields from sessionState and session-init messages

This commit is contained in:
Dimos Raptis 2020-09-15 13:10:20 +01:00
parent 64e7fdd83a
commit 086a406fc5
7 changed files with 8 additions and 26 deletions

View File

@ -52,7 +52,6 @@ data class SessionId(val value: BigInteger) {
*
* @param initiatorSessionId the session ID of the initiator. On the sending side this is the *source* ID, on the
* receiving side this is the *sink* ID.
* @param initiationEntropy additional randomness to seed the initiated flow's deduplication ID.
* @param initiatorFlowClassName the class name to be used to determine the initiating-initiated mapping on the receiver
* side.
* @param flowVersion the version of the initiating flow.
@ -61,7 +60,6 @@ data class SessionId(val value: BigInteger) {
*/
data class InitialSessionMessage(
val initiatorSessionId: SessionId,
val initiationEntropy: Long,
val initiatorFlowClassName: String,
val flowVersion: Int,
val appName: String,
@ -69,7 +67,6 @@ data class InitialSessionMessage(
) : SessionMessage() {
override fun toString() = "InitialSessionMessage(" +
"initiatorSessionId=$initiatorSessionId, " +
"initiationEntropy=$initiationEntropy, " +
"initiatorFlowClassName=$initiatorFlowClassName, " +
"appName=$appName, " +
"firstPayload=${firstPayload?.javaClass}" +

View File

@ -271,8 +271,6 @@ data class CheckpointState(
*/
sealed class SessionState {
abstract val deduplicationSeed: String
/**
* the sender UUID last seen in this session, if there was one.
*/
@ -328,15 +326,12 @@ sealed class SessionState {
val destination: Destination,
val initiatingSubFlow: SubFlow.Initiating,
val sourceSessionId: SessionId,
val additionalEntropy: Long,
val hasBeenAcknowledged: Pair<Party, ConfirmSessionMessage>?,
val hasBeenRejected: RejectSessionMessage?,
override val receivedMessages: Map<Int, ExistingSessionMessagePayload>,
override val lastSenderUUID: String?,
override val lastSenderSeqNo: Long?
) : SessionState() {
override val deduplicationSeed: String get() = "R-${sourceSessionId.value}-$additionalEntropy"
}
) : SessionState()
/**
* We have sent the initialisation message but have not yet received a confirmation.
@ -348,7 +343,6 @@ sealed class SessionState {
data class Initiating(
val bufferedMessages: List<Pair<MessageIdentifier, ExistingSessionMessagePayload>>,
val rejectionError: FlowError?,
override val deduplicationSeed: String,
val nextSendingSeqNumber: Int,
val shardId: String,
override val receivedMessages: Map<Int, ExistingSessionMessagePayload>,
@ -384,7 +378,6 @@ sealed class SessionState {
val peerFlowInfo: FlowInfo,
val otherSideErrored: Boolean,
val peerSinkSessionId: SessionId,
override val deduplicationSeed: String,
val nextSendingSeqNumber: Int,
val lastProcessedSeqNumber: Int,
val shardId: String,

View File

@ -90,7 +90,6 @@ class DeliverSessionMessageTransition(
peerFlowInfo = message.initiatedFlowInfo,
receivedMessages = emptyMap(),
peerSinkSessionId = message.initiatedSessionId,
deduplicationSeed = sessionState.deduplicationSeed,
otherSideErrored = false,
nextSendingSeqNumber = sessionState.nextSendingSeqNumber,
lastProcessedSeqNumber = 0,

View File

@ -292,11 +292,10 @@ class StartedFlowTransition(
}
val shardId = generateShardId(context.id.toString())
val counterpartySessionId = sourceSessionId.calculateInitiatedSessionId()
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null)
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null)
val newSessionState = SessionState.Initiating(
bufferedMessages = emptyList(),
rejectionError = null,
deduplicationSeed = sessionState.deduplicationSeed,
nextSendingSeqNumber = 1,
shardId = shardId,
receivedMessages = emptyMap(),
@ -342,7 +341,6 @@ class StartedFlowTransition(
receivedMessages = emptyMap(),
otherSideErrored = false,
peerSinkSessionId = sessionState.hasBeenAcknowledged.second.initiatedSessionId,
deduplicationSeed = sessionState.deduplicationSeed,
nextSendingSeqNumber = 1,
lastProcessedSeqNumber = 0,
shardId = shardId,
@ -354,14 +352,13 @@ class StartedFlowTransition(
newSessions[sourceSessionId] = SessionState.Initiating(
bufferedMessages = emptyList(),
rejectionError = null,
deduplicationSeed = sessionState.deduplicationSeed,
nextSendingSeqNumber = 1,
shardId = shardId,
receivedMessages = emptyMap(),
lastSenderUUID = null,
lastSenderSeqNo = null
)
val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message)
val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, message)
val messageType = MessageType.inferFromMessage(initialMessage)
val messageIdentifier = MessageIdentifier(messageType, shardId, sourceSessionId.calculateInitiatedSessionId(), 0, checkpoint.checkpointState.suspensionTime)
Action.SendInitial(uninitiatedSessionState.destination, initialMessage, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID))
@ -527,13 +524,10 @@ class StartedFlowTransition(
private fun createInitialSessionMessage(
initiatingSubFlow: SubFlow.Initiating,
sourceSessionId: SessionId,
additionalEntropy: Long,
payload: SerializedBytes<Any>?
): InitialSessionMessage {
return InitialSessionMessage(
initiatorSessionId = sourceSessionId,
// We add additional entropy to add to the initiated side's deduplication seed.
initiationEntropy = additionalEntropy,
initiatorFlowClassName = initiatingSubFlow.classToInitiateWith.name,
flowVersion = initiatingSubFlow.flowInfo.flowVersion,
appName = initiatingSubFlow.flowInfo.appName,

View File

@ -318,7 +318,7 @@ class TopLevelTransition(
}
val sourceSessionId = SessionId.createRandom(context.secureRandom)
val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId)
val newSessions = checkpoint.checkpointState.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong(), null, null, emptyMap(), null, null))
val newSessions = checkpoint.checkpointState.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, null, null, emptyMap(), null, null))
currentState = currentState.copy(checkpoint = checkpoint.setSessions(newSessions))
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
FlowContinuation.Resume(sessionImpl)

View File

@ -55,7 +55,6 @@ class UnstartedFlowTransition(
} else {
mapOf(0 to DataSessionMessage(initiatingMessage.firstPayload))
},
deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.value}-${initiatingMessage.initiationEntropy}",
otherSideErrored = false,
nextSendingSeqNumber = 1,
lastProcessedSeqNumber = if (initiatingMessage.firstPayload == null) {

View File

@ -574,7 +574,7 @@ class FlowFrameworkTests {
@Test(timeout=300_000)
fun `session init with unknown class is sent to the flow hospital, from where we then drop it`() {
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId.createRandom(SecureRandom()), 0, "not.a.real.Class", 1, "", null), bob)
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId.createRandom(SecureRandom()), "not.a.real.Class", 1, "", null), bob)
mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital
val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot
@ -590,7 +590,7 @@ class FlowFrameworkTests {
@Test(timeout=300_000)
fun `non-flow class in session init`() {
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId.createRandom(SecureRandom()), 0, String::class.java.name, 1, "", null), bob)
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId.createRandom(SecureRandom()), String::class.java.name, 1, "", null), bob)
mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
@ -1049,7 +1049,7 @@ internal inline fun <reified P : FlowLogic<*>> TestStartedNode.getSingleFlow():
}
private fun sanitise(message: SessionMessage) = when (message) {
is InitialSessionMessage -> message.copy(initiatorSessionId = SessionId(BigInteger.valueOf(0)), initiationEntropy = 0, appName = "")
is InitialSessionMessage -> message.copy(initiatorSessionId = SessionId(BigInteger.valueOf(0)), appName = "")
is ExistingSessionMessage -> {
val payload = message.payload
message.copy(
@ -1107,7 +1107,7 @@ internal data class SessionTransfer(val from: Int, val message: SessionMessage,
}
internal fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage {
return InitialSessionMessage(SessionId(BigInteger.valueOf(0)), 0, clientFlowClass.java.name, flowVersion, "", payload?.serialize())
return InitialSessionMessage(SessionId(BigInteger.valueOf(0)), clientFlowClass.java.name, flowVersion, "", payload?.serialize())
}
internal fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(BigInteger.valueOf(0)), DataSessionMessage(payload.serialize()))