mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
State machine rewrite
This commit is contained in:
@ -215,3 +215,15 @@ fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: CordaPersistence?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun parserTransactionIsolationLevel(property: String?): Int =
|
||||
when (property) {
|
||||
"none" -> Connection.TRANSACTION_NONE
|
||||
"readUncommitted" -> Connection.TRANSACTION_READ_UNCOMMITTED
|
||||
"readCommitted" -> Connection.TRANSACTION_READ_COMMITTED
|
||||
"repeatableRead" -> Connection.TRANSACTION_REPEATABLE_READ
|
||||
"serializable" -> Connection.TRANSACTION_SERIALIZABLE
|
||||
else -> {
|
||||
Connection.TRANSACTION_REPEATABLE_READ
|
||||
}
|
||||
}
|
||||
|
@ -14,11 +14,15 @@ class DatabaseTransaction(
|
||||
) {
|
||||
val id: UUID = UUID.randomUUID()
|
||||
|
||||
private var _connectionCreated = false
|
||||
val connectionCreated get() = _connectionCreated
|
||||
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
cordaPersistence.dataSource.connection.apply {
|
||||
autoCommit = false
|
||||
transactionIsolation = isolation
|
||||
}
|
||||
cordaPersistence.dataSource.connection
|
||||
.apply {
|
||||
_connectionCreated = true
|
||||
autoCommit = false
|
||||
transactionIsolation = isolation
|
||||
}
|
||||
}
|
||||
|
||||
private val sessionDelegate = lazy {
|
||||
@ -30,20 +34,22 @@ class DatabaseTransaction(
|
||||
val session: Session by sessionDelegate
|
||||
private lateinit var hibernateTransaction: Transaction
|
||||
|
||||
private val outerTransaction: DatabaseTransaction? = threadLocal.get()
|
||||
val outerTransaction: DatabaseTransaction? = threadLocal.get()
|
||||
|
||||
fun commit() {
|
||||
if (sessionDelegate.isInitialized()) {
|
||||
hibernateTransaction.commit()
|
||||
}
|
||||
connection.commit()
|
||||
if (_connectionCreated) {
|
||||
connection.commit()
|
||||
}
|
||||
}
|
||||
|
||||
fun rollback() {
|
||||
if (sessionDelegate.isInitialized() && session.isOpen) {
|
||||
session.clear()
|
||||
}
|
||||
if (!connection.isClosed) {
|
||||
if (_connectionCreated && !connection.isClosed) {
|
||||
connection.rollback()
|
||||
}
|
||||
}
|
||||
@ -52,7 +58,9 @@ class DatabaseTransaction(
|
||||
if (sessionDelegate.isInitialized() && session.isOpen) {
|
||||
session.close()
|
||||
}
|
||||
connection.close()
|
||||
if (_connectionCreated) {
|
||||
connection.close()
|
||||
}
|
||||
threadLocal.set(outerTransaction)
|
||||
if (outerTransaction == null) {
|
||||
transactionBoundaries.onNext(DatabaseTransactionManager.Boundary(id))
|
||||
|
@ -3,7 +3,7 @@ package net.corda.nodeapi.internal.serialization
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.util.DefaultClassResolver
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.node.services.statemachine.SessionData
|
||||
import net.corda.node.services.statemachine.DataSessionMessage
|
||||
import net.corda.nodeapi.internal.serialization.amqp.DeserializationInput
|
||||
import net.corda.nodeapi.internal.serialization.amqp.Envelope
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||
@ -47,17 +47,17 @@ class ListsSerializationTest {
|
||||
@Test
|
||||
fun `check list can be serialized as part of SessionData`() {
|
||||
run {
|
||||
val sessionData = SessionData(123, listOf(1).serialize())
|
||||
val sessionData = DataSessionMessage(listOf(1).serialize())
|
||||
assertEqualAfterRoundTripSerialization(sessionData)
|
||||
assertEquals(listOf(1), sessionData.payload.deserialize())
|
||||
}
|
||||
run {
|
||||
val sessionData = SessionData(123, listOf(1, 2).serialize())
|
||||
val sessionData = DataSessionMessage(listOf(1, 2).serialize())
|
||||
assertEqualAfterRoundTripSerialization(sessionData)
|
||||
assertEquals(listOf(1, 2), sessionData.payload.deserialize())
|
||||
}
|
||||
run {
|
||||
val sessionData = SessionData(123, emptyList<Int>().serialize())
|
||||
val sessionData = DataSessionMessage(emptyList<Int>().serialize())
|
||||
assertEqualAfterRoundTripSerialization(sessionData)
|
||||
assertEquals(emptyList<Int>(), sessionData.payload.deserialize())
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.services.statemachine.SessionData
|
||||
import net.corda.node.services.statemachine.DataSessionMessage
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.amqpSpecific
|
||||
@ -41,7 +41,7 @@ class MapsSerializationTest {
|
||||
|
||||
@Test
|
||||
fun `check list can be serialized as part of SessionData`() {
|
||||
val sessionData = SessionData(123, smallMap.serialize())
|
||||
val sessionData = DataSessionMessage(smallMap.serialize())
|
||||
assertEqualAfterRoundTripSerialization(sessionData)
|
||||
assertEquals(smallMap, sessionData.payload.deserialize())
|
||||
}
|
||||
|
@ -4,10 +4,10 @@ import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.util.DefaultClassResolver
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.services.statemachine.SessionData
|
||||
import net.corda.node.services.statemachine.DataSessionMessage
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.kryoSpecific
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.kryoSpecific
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Rule
|
||||
@ -34,17 +34,17 @@ class SetsSerializationTest {
|
||||
@Test
|
||||
fun `check set can be serialized as part of SessionData`() {
|
||||
run {
|
||||
val sessionData = SessionData(123, setOf(1).serialize())
|
||||
val sessionData = DataSessionMessage(setOf(1).serialize())
|
||||
assertEqualAfterRoundTripSerialization(sessionData)
|
||||
assertEquals(setOf(1), sessionData.payload.deserialize())
|
||||
}
|
||||
run {
|
||||
val sessionData = SessionData(123, setOf(1, 2).serialize())
|
||||
val sessionData = DataSessionMessage(setOf(1, 2).serialize())
|
||||
assertEqualAfterRoundTripSerialization(sessionData)
|
||||
assertEquals(setOf(1, 2), sessionData.payload.deserialize())
|
||||
}
|
||||
run {
|
||||
val sessionData = SessionData(123, emptySet<Int>().serialize())
|
||||
val sessionData = DataSessionMessage(emptySet<Int>().serialize())
|
||||
assertEqualAfterRoundTripSerialization(sessionData)
|
||||
assertEquals(emptySet<Int>(), sessionData.payload.deserialize())
|
||||
}
|
||||
|
Reference in New Issue
Block a user