Simplified the implementation of the tx query APIs with the introduction of Class.castIfPossible

This commit is contained in:
Shams Asari
2017-07-21 15:38:03 +01:00
parent a485bbada8
commit 3e199e51fc
8 changed files with 129 additions and 133 deletions

View File

@ -1,5 +1,6 @@
package net.corda.node.services.database
import net.corda.core.internal.castIfPossible
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.SchemaService
@ -100,17 +101,14 @@ class HibernateConfiguration(val schemaService: SchemaService, val useDefaultLog
override fun supportsAggressiveRelease(): Boolean = true
override fun getConnection(): Connection =
DatabaseTransactionManager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
try {
return unwrapType.cast(this)
} catch(e: ClassCastException) {
throw UnknownUnwrapTypeException(unwrapType)
}
override fun getConnection(): Connection {
return DatabaseTransactionManager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection
}
override fun isUnwrappableAs(unwrapType: Class<*>?): Boolean = (unwrapType == NodeDatabaseConnectionProvider::class.java)
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
return unwrapType.castIfPossible(this) ?: throw UnknownUnwrapTypeException(unwrapType)
}
override fun isUnwrappableAs(unwrapType: Class<*>?): Boolean = unwrapType == NodeDatabaseConnectionProvider::class.java
}
}

View File

@ -4,6 +4,7 @@ import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.castIfPossible
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.UntrustworthyData
@ -42,10 +43,7 @@ data class ErrorSessionEnd(override val recipientSessionId: Long, val errorRespo
data class ReceivedSessionMessage<out M : ExistingSessionMessage>(val sender: Party, val message: M)
fun <T> ReceivedSessionMessage<SessionData>.checkPayloadIs(type: Class<T>): UntrustworthyData<T> {
if (type.isInstance(message.payload)) {
return UntrustworthyData(type.cast(message.payload))
} else {
throw UnexpectedFlowEndException("We were expecting a ${type.name} from $sender but we instead got a " +
"${message.payload.javaClass.name} (${message.payload})")
}
return type.castIfPossible(message.payload)?.let { UntrustworthyData(it) } ?:
throw UnexpectedFlowEndException("We were expecting a ${type.name} from $sender but we instead got a " +
"${message.payload.javaClass.name} (${message.payload})")
}

View File

@ -17,6 +17,7 @@ import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.castIfPossible
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.SerializationDefaults.CHECKPOINT_CONTEXT
@ -145,10 +146,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
@Suppress("UNCHECKED_CAST")
return mutex.locked {
stateMachines.keys
.map { it.logic }
.filterIsInstance(flowClass)
.map { it to (it.stateMachine as FlowStateMachineImpl<T>).resultFuture }
stateMachines.keys.mapNotNull {
flowClass.castIfPossible(it.logic)?.let { it to (it.stateMachine as FlowStateMachineImpl<T>).resultFuture }
}
}
}
@ -380,7 +380,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun deserializeFiber(checkpoint: Checkpoint, logger: Logger): FlowStateMachineImpl<*>? {
return try {
checkpoint.serializedFiber.deserialize<FlowStateMachineImpl<*>>(context = CHECKPOINT_CONTEXT.withTokenContext(serializationContext)).apply { fromCheckpoint = true }
checkpoint.serializedFiber.deserialize(context = CHECKPOINT_CONTEXT.withTokenContext(serializationContext)).apply { fromCheckpoint = true }
} catch (t: Throwable) {
logger.error("Encountered unrestorable checkpoint!", t)
null