Added @Suspendable to UntrustworthyData.unwrap to allow Java flows to do I/O inside unwrap

This commit is contained in:
Shams Asari 2017-03-15 11:18:14 +00:00
parent 391270ed71
commit 51a5eb77a0
2 changed files with 77 additions and 0 deletions

View File

@ -1,5 +1,6 @@
package net.corda.core.utilities package net.corda.core.utilities
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowException import net.corda.core.flows.FlowException
/** /**
@ -18,6 +19,7 @@ class UntrustworthyData<out T>(private val fromUntrustedWorld: T) {
@Deprecated("Accessing the untrustworthy data directly without validating it first is a bad idea") @Deprecated("Accessing the untrustworthy data directly without validating it first is a bad idea")
get() = fromUntrustedWorld get() = fromUntrustedWorld
@Suspendable
@Throws(FlowException::class) @Throws(FlowException::class)
fun <R> unwrap(validator: Validator<T, R>) = validator.validate(fromUntrustedWorld) fun <R> unwrap(validator: Validator<T, R>) = validator.validate(fromUntrustedWorld)
@ -26,6 +28,7 @@ class UntrustworthyData<out T>(private val fromUntrustedWorld: T) {
inline fun <R> validate(validator: (T) -> R) = validator(data) inline fun <R> validate(validator: (T) -> R) = validator(data)
interface Validator<in T, out R> { interface Validator<in T, out R> {
@Suspendable
@Throws(FlowException::class) @Throws(FlowException::class)
fun validate(data: T): R fun validate(data: T): R
} }

View File

@ -0,0 +1,74 @@
package net.corda.core.flows;
import co.paralleluniverse.fibers.Suspendable;
import net.corda.core.crypto.Party;
import net.corda.testing.node.MockNetwork;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.Future;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
public class FlowsInJavaTest {
private final MockNetwork net = new MockNetwork();
private MockNetwork.MockNode node1;
private MockNetwork.MockNode node2;
@Before
public void setUp() {
MockNetwork.BasketOfNodes someNodes = net.createSomeNodes(2);
node1 = someNodes.getPartyNodes().get(0);
node2 = someNodes.getPartyNodes().get(1);
net.runNetwork();
}
@After
public void cleanUp() {
net.stopNodes();
}
@Test
public void suspendableActionInsideUnwrap() throws Exception {
node2.getServices().registerFlowInitiator(SendInUnwrapFlow.class, (otherParty) -> new OtherFlow(otherParty, "Hello"));
Future<String> result = node1.getServices().startFlow(new SendInUnwrapFlow(node2.getInfo().getLegalIdentity())).getResultFuture();
net.runNetwork();
assertThat(result.get()).isEqualTo("Hello");
}
private static class SendInUnwrapFlow extends FlowLogic<String> {
private final Party otherParty;
private SendInUnwrapFlow(Party otherParty) {
this.otherParty = otherParty;
}
@Suspendable
@Override
public String call() throws FlowException {
return receive(String.class, otherParty).unwrap(data -> {
send(otherParty, "Something");
return data;
});
}
}
private static class OtherFlow extends FlowLogic<String> {
private final Party otherParty;
private final String payload;
private OtherFlow(Party otherParty, String payload) {
this.otherParty = otherParty;
this.payload = payload;
}
@Suspendable
@Override
public String call() throws FlowException {
return sendAndReceive(String.class, otherParty, payload).unwrap(data -> data);
}
}
}