FlowException serialised over RPC (subtypes are flattened), and improvement to startFlow RPC for correct exception handling

This commit is contained in:
Shams Asari
2017-02-01 21:27:06 +00:00
parent 56dbf1e844
commit b86c80691e
26 changed files with 261 additions and 143 deletions

View File

@ -6,10 +6,7 @@ package net.corda.core
import com.google.common.base.Function
import com.google.common.base.Throwables
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.*
import kotlinx.support.jdk7.use
import net.corda.core.crypto.newSecureRandom
import org.slf4j.Logger
@ -115,7 +112,7 @@ inline fun <T> SettableFuture<T>.catch(block: () -> T) {
}
}
fun <A> ListenableFuture<A>.toObservable(): Observable<A> {
fun <A> ListenableFuture<out A>.toObservable(): Observable<A> {
return Observable.create { subscriber ->
success {
subscriber.onNext(it)
@ -384,15 +381,24 @@ fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> {
/**
* Returns a [ListenableFuture] bound to the *first* item emitted by this Observable. The future will complete with a
* NoSuchElementException if no items are emitted or any other error thrown by the Observable.
* NoSuchElementException if no items are emitted or any other error thrown by the Observable. If it's cancelled then
* it will unsubscribe from the observable.
*/
fun <T> Observable<T>.toFuture(): ListenableFuture<T> {
val future = SettableFuture.create<T>()
first().subscribe(
{ future.set(it) },
{ future.setException(it) }
)
return future
fun <T> Observable<T>.toFuture(): ListenableFuture<T> = ObservableToFuture(this)
private class ObservableToFuture<T>(observable: Observable<T>) : AbstractFuture<T>(), Observer<T> {
private val subscription = observable.first().subscribe(this)
override fun onNext(value: T) {
set(value)
}
override fun onError(e: Throwable) {
setException(e)
}
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
subscription.unsubscribe()
return super.cancel(mayInterruptIfRunning)
}
override fun onCompleted() {}
}
/** Return the sum of an Iterable of [BigDecimal]s. */

View File

@ -1,5 +1,6 @@
package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.CompositeKey
@ -107,15 +108,16 @@ interface CordaRPCOps : RPCOps {
fun uploadFile(dataType: String, name: String?, file: InputStream): String
/**
* Returns the node-local current time.
* Returns the node's current time.
*/
fun currentNodeTime(): Instant
/**
* Returns an Observable emitting a single Unit once the node is registered with the network map.
* Returns a [ListenableFuture] which completes when the node has registered wih the network map service. It can also
* complete with an exception if it is unable to.
*/
@RPCReturnsObservables
fun waitUntilRegisteredWithNetworkMap(): Observable<Unit>
fun waitUntilRegisteredWithNetworkMap(): ListenableFuture<Unit>
// TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of
// the node's state locally and query that directly.
@ -179,13 +181,10 @@ inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlow
*
* @param id The started state machine's ID.
* @param progress The stream of progress tracker events.
* @param returnValue An Observable emitting a single event containing the flow's return value.
* To block on this value:
* val returnValue = rpc.startFlow(::MyFlow).returnValue.toBlocking().first()
* @param returnValue A [ListenableFuture] of the flow's return value.
*/
data class FlowHandle<A>(
val id: StateMachineRunId,
val progress: Observable<String>,
// TODO This should be ListenableFuture<A>
val returnValue: Observable<A>
val returnValue: ListenableFuture<A>
)

View File

@ -5,6 +5,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
@ -66,7 +67,7 @@ import kotlin.reflect.jvm.javaType
*/
// A convenient instance of Kryo pre-configured with some useful things. Used as a default by various functions.
val THREAD_LOCAL_KRYO = ThreadLocal.withInitial { createKryo() }
val THREAD_LOCAL_KRYO: ThreadLocal<Kryo> = ThreadLocal.withInitial { createKryo() }
/**
* A type safe wrapper around a byte array that contains a serialised object. You can call [SerializedBytes.deserialize]
@ -76,7 +77,7 @@ class SerializedBytes<T : Any>(bytes: ByteArray) : OpaqueBytes(bytes) {
// It's OK to use lazy here because SerializedBytes is configured to use the ImmutableClassSerializer.
val hash: SecureHash by lazy { bytes.sha256() }
fun writeToFile(path: Path) = Files.write(path, bytes)
fun writeToFile(path: Path): Path = Files.write(path, bytes)
}
// Some extension functions that make deserialisation convenient and provide auto-casting of the result.
@ -385,8 +386,7 @@ object KotlinObjectSerializer : Serializer<DeserializeAsKotlinObjectDef>() {
return type.getField("INSTANCE").get(null) as DeserializeAsKotlinObjectDef
}
override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) {
}
override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) {}
}
fun createKryo(k: Kryo = Kryo()): Kryo {
@ -402,12 +402,10 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
// Because we like to stick a Kryo object in a ThreadLocal to speed things up a bit, we can end up trying to
// serialise the Kryo object itself when suspending a fiber. That's dumb, useless AND can cause crashes, so
// we avoid it here.
register(Kryo::class.java, object : Serializer<Kryo>() {
override fun read(kryo: Kryo, input: Input, type: Class<Kryo>): Kryo {
return createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo)
}
override fun write(kryo: Kryo, output: Output, obj: Kryo) {}
})
register(Kryo::class,
read = { kryo, input -> createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) },
write = { kryo, output, obj -> }
)
register(EdDSAPublicKey::class.java, Ed25519PublicKeySerializer)
register(EdDSAPrivateKey::class.java, Ed25519PrivateKeySerializer)
@ -435,6 +433,8 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
// This ensures a NonEmptySetSerializer is constructed with an initial value.
register(NonEmptySet::class.java, NonEmptySetSerializer)
register(Array<StackTraceElement>::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> })
/** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */
addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer)
@ -451,6 +451,19 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
}
}
inline fun <T : Any> Kryo.register(
type: KClass<T>,
crossinline read: (Kryo, Input) -> T,
crossinline write: (Kryo, Output, T) -> Unit): Registration {
return register(
type.java,
object : Serializer<T>() {
override fun read(kryo: Kryo, input: Input, type: Class<T>): T = read(kryo, input)
override fun write(kryo: Kryo, output: Output, obj: T) = write(kryo, output, obj)
}
)
}
/**
* Use this method to mark any types which can have the same instance within it more than once. This will make sure
* the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic
@ -517,7 +530,7 @@ var Kryo.attachmentStorage: AttachmentStorage?
//Used in Merkle tree calculation. It doesn't cover all the cases of unstable serialization format.
fun extendKryoHash(kryo: Kryo): Kryo {
return kryo.apply {
setReferences(false)
references = false
register(LinkedHashMap::class.java, MapSerializer())
register(HashMap::class.java, OrderedSerializer)
}

View File

@ -4,6 +4,7 @@ import org.assertj.core.api.Assertions.*
import org.junit.Test
import rx.subjects.PublishSubject
import java.util.*
import java.util.concurrent.CancellationException
class UtilsTest {
@Test
@ -44,4 +45,16 @@ class UtilsTest {
future.getOrThrow()
}.isSameAs(exception)
}
@Test
fun `toFuture - cancel`() {
val subject = PublishSubject.create<String>()
val future = subject.toFuture()
future.cancel(false)
assertThat(subject.hasObservers()).isFalse()
subject.onNext("Hello")
assertThatExceptionOfType(CancellationException::class.java).isThrownBy {
future.get()
}
}
}