This commit is contained in:
Andras Slemmer 2018-04-23 13:28:34 +01:00
parent 5b4fd6fe64
commit 6bf34ed5c7
3 changed files with 3 additions and 18 deletions

View File

@ -185,20 +185,6 @@ abstract class FlowLogic<out T> {
return getDeprecatedSessionForParty(otherParty).sendAndReceive(receiveType, payload) return getDeprecatedSessionForParty(otherParty).sendAndReceive(receiveType, payload)
} }
/**
* Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received.
*
* Note that this method should NOT be used for regular party-to-party communication, use [sendAndReceive] instead.
* It is only intended for the case where the [otherParty] is running a distributed service with an idempotent
* flow which only accepts a single request and sends back a single response e.g. a notary or certain types of
* oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered
* to a different one, so there is no need to wait until the initial node comes back up to obtain a response.
*/
@Deprecated("Use FlowSession.sendAndReceiveWithRetry()", level = DeprecationLevel.WARNING)
internal inline fun <reified R : Any> sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData<R> {
return getDeprecatedSessionForParty(otherParty).sendAndReceiveWithRetry(payload)
}
/** /**
* Suspends until the specified [otherParty] sends us a message of type [R]. * Suspends until the specified [otherParty] sends us a message of type [R].
* *

View File

@ -96,7 +96,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> { override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return txStorage.locked { return txStorage.locked {
DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.wrapWithDatabaseTransaction()) DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.bufferUntilSubscribed().wrapWithDatabaseTransaction())
} }
} }

View File

@ -297,7 +297,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override fun <R : Any> suspend(ioRequest: FlowIORequest<R>, maySkipCheckpoint: Boolean): R { override fun <R : Any> suspend(ioRequest: FlowIORequest<R>, maySkipCheckpoint: Boolean): R {
val serializationContext = TransientReference(getTransientField(TransientValues::checkpointSerializationContext)) val serializationContext = TransientReference(getTransientField(TransientValues::checkpointSerializationContext))
val transaction = extractThreadLocalTransaction() val transaction = extractThreadLocalTransaction()
val transitionExecutor = TransientReference(getTransientField(TransientValues::transitionExecutor))
parkAndSerialize { _, _ -> parkAndSerialize { _, _ ->
logger.trace { "Suspended on $ioRequest" } logger.trace { "Suspended on $ioRequest" }
@ -312,8 +311,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Event.Error(throwable) Event.Error(throwable)
} }
// We must commit the database transaction before returning from this closure, otherwise Quasar may schedule // We must commit the database transaction before returning from this closure otherwise Quasar may schedule
// other fibers // other fibers, so we process the event immediately
val continuation = processEventImmediately( val continuation = processEventImmediately(
event, event,
isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnEntry = true,