StateMachineManager is no longer lateinit. (#2123)

This commit is contained in:
Andrzej Cichocki 2017-11-27 17:55:08 +00:00 committed by GitHub
parent 4ca54b73fe
commit 4bd6fef0f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 63 additions and 66 deletions

View File

@ -49,8 +49,7 @@ class IdentitySyncFlowTests {
val alice: Party = aliceNode.info.singleIdentity()
val bob: Party = bobNode.info.singleIdentity()
val notary = mockNet.defaultNotaryIdentity
bobNode.internals.registerInitiatedFlow(Receive::class.java)
bobNode.registerInitiatedFlow(Receive::class.java)
// Alice issues then pays some cash to a new confidential identity that Bob doesn't know about
val anonymous = true
val ref = OpaqueBytes.of(0x01)
@ -80,8 +79,7 @@ class IdentitySyncFlowTests {
val bob: Party = bobNode.info.singleIdentity()
val charlie: Party = charlieNode.info.singleIdentity()
val notary = mockNet.defaultNotaryIdentity
bobNode.internals.registerInitiatedFlow(Receive::class.java)
bobNode.registerInitiatedFlow(Receive::class.java)
// Charlie issues then pays some cash to a new confidential identity
val anonymous = true
val ref = OpaqueBytes.of(0x01)

View File

@ -38,7 +38,7 @@ public class FlowsInJavaTest {
@Test
public void suspendableActionInsideUnwrap() throws Exception {
bobNode.getInternals().registerInitiatedFlow(SendHelloAndThenReceive.class);
bobNode.registerInitiatedFlow(SendHelloAndThenReceive.class);
Future<String> result = startFlow(aliceNode.getServices(), new SendInUnwrapFlow(bob)).getResultFuture();
mockNet.runNetwork();
assertThat(result.get()).isEqualTo("Hello");

View File

@ -52,10 +52,8 @@ class AttachmentTests {
val bobNode = mockNet.createPartyNode(BOB.name)
val alice = aliceNode.info.singleIdentity()
aliceNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
aliceNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
// Insert an attachment into node zero's store directly.
val id = aliceNode.database.transaction {
aliceNode.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
@ -85,10 +83,8 @@ class AttachmentTests {
fun `missing`() {
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
aliceNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
aliceNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
// Get node one to fetch a non-existent attachment.
val hash = SecureHash.randomSHA256()
val alice = aliceNode.info.singleIdentity()
@ -108,10 +104,8 @@ class AttachmentTests {
})
val bobNode = mockNet.createNode(MockNodeParameters(legalName = BOB.name))
val alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)
aliceNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
aliceNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
val attachment = fakeAttachment()
// Insert an attachment into node zero's store directly.
val id = aliceNode.database.transaction {

View File

@ -50,7 +50,7 @@ class CollectSignaturesFlowTests {
private fun registerFlowOnAllNodes(flowClass: KClass<out FlowLogic<*>>) {
listOf(aliceNode, bobNode, charlieNode).forEach {
it.internals.registerInitiatedFlow(flowClass.java)
it.registerInitiatedFlow(flowClass.java)
}
}

View File

@ -38,14 +38,14 @@ class NoAnswer(private val closure: () -> Unit = {}) : FlowLogic<Unit>() {
* Allows to register a flow of type [R] against an initiating flow of type [I].
*/
inline fun <I : FlowLogic<*>, reified R : FlowLogic<*>> StartedNode<*>.registerInitiatedFlow(initiatingFlowType: KClass<I>, crossinline construct: (session: FlowSession) -> R) {
internals.internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> construct(session) }, R::class.javaObjectType, true)
internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> construct(session) }, R::class.javaObjectType, true)
}
/**
* Allows to register a flow of type [Answer] against an initiating flow of type [I], returning a valure of type [R].
*/
inline fun <I : FlowLogic<*>, reified R : Any> StartedNode<*>.registerAnswer(initiatingFlowType: KClass<I>, value: R) {
internals.internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> Answer(session, value) }, Answer::class.javaObjectType, true)
internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> Answer(session, value) }, Answer::class.javaObjectType, true)
}
/**

View File

@ -42,8 +42,8 @@ class ResolveTransactionsFlowTest {
notaryNode = mockNet.defaultNotaryNode
megaCorpNode = mockNet.createPartyNode(MEGA_CORP.name)
miniCorpNode = mockNet.createPartyNode(MINI_CORP.name)
megaCorpNode.internals.registerInitiatedFlow(TestResponseFlow::class.java)
miniCorpNode.internals.registerInitiatedFlow(TestResponseFlow::class.java)
megaCorpNode.registerInitiatedFlow(TestResponseFlow::class.java)
miniCorpNode.registerInitiatedFlow(TestResponseFlow::class.java)
notary = mockNet.defaultNotaryIdentity
megaCorp = megaCorpNode.info.singleIdentity()
miniCorp = miniCorpNode.info.singleIdentity()

View File

@ -148,7 +148,7 @@ class AttachmentSerializationTest {
}
private fun launchFlow(clientLogic: ClientLogic, rounds: Int, sendData: Boolean = false) {
server.internals.internalRegisterFlowFactory(
server.internalRegisterFlowFactory(
ClientLogic::class.java,
InitiatedFlowFactory.Core { ServerLogic(it, sendData) },
ServerLogic::class.java,

View File

@ -28,7 +28,7 @@ class CustomVaultQueryTest {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.finance", "net.corda.docs"))
nodeA = mockNet.createPartyNode()
nodeB = mockNet.createPartyNode()
nodeA.internals.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java)
nodeA.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java)
notary = mockNet.defaultNotaryIdentity
}

View File

@ -27,7 +27,7 @@ class FxTransactionBuildTutorialTest {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.finance"))
nodeA = mockNet.createPartyNode()
nodeB = mockNet.createPartyNode()
nodeB.internals.registerInitiatedFlow(ForeignExchangeRemoteFlow::class.java)
nodeB.registerInitiatedFlow(ForeignExchangeRemoteFlow::class.java)
notary = mockNet.defaultNotaryIdentity
}

View File

@ -35,7 +35,7 @@ class WorkflowTransactionBuildTutorialTest {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.docs"))
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val bobNode = mockNet.createPartyNode(BOB_NAME)
aliceNode.internals.registerInitiatedFlow(RecordCompletionFlow::class.java)
aliceNode.registerInitiatedFlow(RecordCompletionFlow::class.java)
aliceServices = aliceNode.services
bobServices = bobNode.services
alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)

View File

@ -213,7 +213,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
private fun startBobAndCommunicateWithAlice(): Party {
val bob = startNode(BOB.name)
bob.internals.registerInitiatedFlow(ReceiveFlow::class.java)
bob.registerInitiatedFlow(ReceiveFlow::class.java)
val bobParty = bob.info.chooseIdentity()
// Perform a protocol exchange to force the peer queue to be created
alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow()

View File

@ -124,7 +124,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
private lateinit var _services: ServiceHubInternalImpl
protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage
protected lateinit var smm: StateMachineManager
private lateinit var tokenizableServices: List<Any>
protected lateinit var attachments: NodeAttachmentService
protected lateinit var network: MessagingService
@ -153,7 +152,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
@Volatile private var _started: StartedNode<AbstractNode>? = null
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence): CordaRPCOps {
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps {
return SecureCordaRPCOps(services, smm, database, flowStarter)
}
@ -191,7 +190,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val stateLoader = StateLoaderImpl(transactionStorage)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader, database, info, identityService)
val notaryService = makeNotaryService(nodeServices, database)
smm = makeStateMachineManager(database)
val smm = makeStateMachineManager(database)
val flowStarter = FlowStarterImpl(serverThread, smm)
val schedulerService = NodeSchedulerService(
platformClock,
@ -208,13 +207,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
}
}
makeVaultObservers(schedulerService, database.hibernateConfig)
val rpcOps = makeRPCOps(flowStarter, database)
makeVaultObservers(schedulerService, database.hibernateConfig, smm)
val rpcOps = makeRPCOps(flowStarter, database, smm)
startMessagingService(rpcOps)
installCoreFlows()
val cordaServices = installCordaServices(flowStarter)
tokenizableServices = nodeServices + cordaServices + schedulerService
registerCordappFlows()
registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
startShell(rpcOps)
@ -397,11 +396,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
installCoreFlow(NotaryFlow.Client::class, service::createServiceFlow)
}
private fun registerCordappFlows() {
private fun registerCordappFlows(smm: StateMachineManager) {
cordappLoader.cordapps.flatMap { it.initiatedFlows }
.forEach {
try {
registerInitiatedFlowInternal(it, track = false)
registerInitiatedFlowInternal(smm, it, track = false)
} catch (e: NoSuchMethodException) {
log.error("${it.name}, as an initiated flow, must have a constructor with a single parameter " +
"of type ${Party::class.java.name}")
@ -411,13 +410,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
/**
* Use this method to register your initiated flows in your tests. This is automatically done by the node when it
* starts up for all [FlowLogic] classes it finds which are annotated with [InitiatedBy].
* @return An [Observable] of the initiated flows started by counter-parties.
*/
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>): Observable<T> {
return registerInitiatedFlowInternal(initiatedFlowClass, track = true)
internal fun <T : FlowLogic<*>> registerInitiatedFlow(smm: StateMachineManager, initiatedFlowClass: Class<T>): Observable<T> {
return registerInitiatedFlowInternal(smm, initiatedFlowClass, track = true)
}
// TODO remove once not needed
@ -426,7 +420,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
"It should accept a ${FlowSession::class.java.simpleName} instead"
}
private fun <F : FlowLogic<*>> registerInitiatedFlowInternal(initiatedFlow: Class<F>, track: Boolean): Observable<F> {
private fun <F : FlowLogic<*>> registerInitiatedFlowInternal(smm: StateMachineManager, initiatedFlow: Class<F>, track: Boolean): Observable<F> {
val constructors = initiatedFlow.declaredConstructors.associateBy { it.parameterTypes.toList() }
val flowSessionCtor = constructors[listOf(FlowSession::class.java)]?.apply { isAccessible = true }
val ctor: (FlowSession) -> F = if (flowSessionCtor == null) {
@ -447,16 +441,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
"${InitiatedBy::class.java.name} must point to ${classWithAnnotation.name} and not ${initiatingFlow.name}"
}
val flowFactory = InitiatedFlowFactory.CorDapp(version, initiatedFlow.appName, ctor)
val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
val observable = internalRegisterFlowFactory(smm, initiatingFlow, flowFactory, initiatedFlow, track)
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
return observable
}
@VisibleForTesting
fun <F : FlowLogic<*>> internalRegisterFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
internal fun <F : FlowLogic<*>> internalRegisterFlowFactory(smm: StateMachineManager,
initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
val observable = if (track) {
smm.changes.filter { it is StateMachineManager.Change.Add }.map { it.logic }.ofType(initiatedFlowClass)
} else {
@ -519,7 +513,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
protected open fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage = DBTransactionStorage()
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration) {
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager) {
VaultSoftLockManager.install(services.vaultService, smm)
ScheduledActivityObserver.install(services.vaultService, schedulerService)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig)

View File

@ -2,6 +2,8 @@ package net.corda.node.internal
import net.corda.core.contracts.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.internal.VisibleForTesting
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.StateLoader
@ -13,6 +15,7 @@ import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
import rx.Observable
interface StartedNode<out N : AbstractNode> {
val internals: N
@ -26,7 +29,20 @@ interface StartedNode<out N : AbstractNode> {
val rpcOps: CordaRPCOps
val notaryService: NotaryService?
fun dispose() = internals.stop()
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>) = internals.registerInitiatedFlow(initiatedFlowClass)
/**
* Use this method to register your initiated flows in your tests. This is automatically done by the node when it
* starts up for all [FlowLogic] classes it finds which are annotated with [InitiatedBy].
* @return An [Observable] of the initiated flows started by counter-parties.
*/
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>) = internals.registerInitiatedFlow(smm, initiatedFlowClass)
@VisibleForTesting
fun <F : FlowLogic<*>> internalRegisterFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
return internals.internalRegisterFlowFactory(smm, initiatingFlowClass, flowFactory, initiatedFlowClass, track)
}
}
class StateLoaderImpl(private val validatedTransactions: TransactionStorage) : StateLoader {

View File

@ -519,7 +519,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
sellerNode: StartedNode<MockNetwork.MockNode>,
buyerNode: StartedNode<MockNetwork.MockNode>,
assetToSell: StateAndRef<OwnableState>): RunResult {
val buyerFlows: Observable<out FlowLogic<*>> = buyerNode.internals.registerInitiatedFlow(BuyerAcceptor::class.java)
val buyerFlows: Observable<out FlowLogic<*>> = buyerNode.registerInitiatedFlow(BuyerAcceptor::class.java)
val firstBuyerFiber = buyerFlows.toFuture().map { it.stateMachine }
val seller = SellerInitiator(buyer, notary, assetToSell, 1000.DOLLARS, anonymous)
val sellerResult = sellerNode.services.startFlow(seller).resultFuture

View File

@ -672,7 +672,7 @@ class FlowFrameworkTests {
initiatingFlowClass: KClass<out FlowLogic<*>>,
initiatedFlowVersion: Int = 1,
noinline flowFactory: (FlowSession) -> P): CordaFuture<P> {
val observable = internals.internalRegisterFlowFactory(
val observable = internalRegisterFlowFactory(
initiatingFlowClass.java,
InitiatedFlowFactory.CorDapp(initiatedFlowVersion, "", flowFactory),
P::class.java,

View File

@ -66,7 +66,7 @@ class NodePair(private val mockNet: MockNetwork) {
private set
fun <T> communicate(clientLogic: AbstractClientLogic<T>, rebootClient: Boolean): FlowStateMachine<T> {
server.internals.internalRegisterFlowFactory(AbstractClientLogic::class.java, InitiatedFlowFactory.Core { ServerLogic(it, serverRunning) }, ServerLogic::class.java, false)
server.internalRegisterFlowFactory(AbstractClientLogic::class.java, InitiatedFlowFactory.Core { ServerLogic(it, serverRunning) }, ServerLogic::class.java, false)
client.services.startFlow(clientLogic)
while (!serverRunning.get()) mockNet.runNetwork(1)
if (rebootClient) {

View File

@ -209,8 +209,8 @@ class NodeInterestRatesTest {
val mockNet = MockNetwork(cordappPackages = listOf("net.corda.finance.contracts", "net.corda.irs"))
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val oracleNode = mockNet.createNode(MockNodeParameters(legalName = BOB_NAME)).apply {
internals.registerInitiatedFlow(NodeInterestRates.FixQueryHandler::class.java)
internals.registerInitiatedFlow(NodeInterestRates.FixSignHandler::class.java)
registerInitiatedFlow(NodeInterestRates.FixQueryHandler::class.java)
registerInitiatedFlow(NodeInterestRates.FixSignHandler::class.java)
database.transaction {
services.cordaService(NodeInterestRates.Oracle::class.java).knownFixes = TEST_DATA
}

View File

@ -136,10 +136,8 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
.replace("oracleXXX", RatesOracleNode.RATES_SERVICE_NAME.toString()))
irs.fixedLeg.fixedRatePayer = node1.info.chooseIdentity()
irs.floatingLeg.floatingRatePayer = node2.info.chooseIdentity()
node1.internals.registerInitiatedFlow(FixingFlow.Fixer::class.java)
node2.internals.registerInitiatedFlow(FixingFlow.Fixer::class.java)
node1.registerInitiatedFlow(FixingFlow.Fixer::class.java)
node2.registerInitiatedFlow(FixingFlow.Fixer::class.java)
@InitiatingFlow
class StartDealFlow(val otherParty: Party,
val payload: AutoOffer) : FlowLogic<SignedTransaction>() {
@ -152,9 +150,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
@InitiatedBy(StartDealFlow::class)
class AcceptDealFlow(otherSession: FlowSession) : Acceptor(otherSession)
val acceptDealFlows: Observable<AcceptDealFlow> = node2.internals.registerInitiatedFlow(AcceptDealFlow::class.java)
val acceptDealFlows: Observable<AcceptDealFlow> = node2.registerInitiatedFlow(AcceptDealFlow::class.java)
val acceptorTxFuture = acceptDealFlows.toFuture().toCompletableFuture().thenCompose {
uncheckedCast<FlowStateMachine<*>, FlowStateMachine<SignedTransaction>>(it.stateMachine).resultFuture.toCompletableFuture()
}

View File

@ -38,8 +38,7 @@ class TraderDemoTest {
startNode(providedName = DUMMY_BANK_B.name, rpcUsers = listOf(demoUser)),
startNode(providedName = BOC.name, rpcUsers = listOf(bankUser))
).map { (it.getOrThrow() as NodeHandle.InProcess).node }
nodeA.internals.registerInitiatedFlow(BuyerFlow::class.java)
nodeA.registerInitiatedFlow(BuyerFlow::class.java)
val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map {
val client = CordaRPCClient(it.internals.configuration.rpcAddress!!)
client.start(demoUser.username, demoUser.password).proxy

View File

@ -291,7 +291,7 @@ class FlowStackSnapshotTest {
fun `flowStackSnapshot object is serializable`() {
val mockNet = MockNetwork(threadPerNode = true)
val node = mockNet.createPartyNode()
node.internals.registerInitiatedFlow(DummyFlow::class.java)
node.registerInitiatedFlow(DummyFlow::class.java)
node.services.startFlow(FlowStackSnapshotSerializationTestingFlow()).resultFuture.get()
val thrown = try {
mockNet.stopNodes()