cherry-pick 7759fdbb71ea9b2021afd8af0ac05447c5305b3a

This commit is contained in:
Chris Burlinchon 2018-04-16 18:05:01 +01:00 committed by Andras Slemmer
parent c4ceca3787
commit b0d2a258c0
7 changed files with 4 additions and 53 deletions

View File

@ -242,7 +242,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val flowStarter = FlowStarterImpl(serverThread, smm, flowLogicRefFactory)
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)
val schedulerService = NodeSchedulerService(
platformClock,
database,
@ -893,7 +893,7 @@ internal fun logVendorString(database: CordaPersistence, log: Logger) {
}
}
internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter {
internal class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter {
override fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext, deduplicationHandler: DeduplicationHandler?): CordaFuture<FlowStateMachine<T>> {
return smm.startFlow(logic, context, ourIdentity = null, deduplicationHandler = deduplicationHandler)
}

View File

@ -77,8 +77,7 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager {
* Instantiate RPCSecurityManager initialised with users data from a list of [User]
*/
fun fromUserList(id: AuthServiceId, users: List<User>) =
RPCSecurityManagerImpl(
AuthServiceConfig.fromUsers(users).copy(id = id))
RPCSecurityManagerImpl(AuthServiceConfig.fromUsers(users).copy(id = id))
// Build internal Shiro securityManager instance
private fun buildImpl(config: AuthServiceConfig): DefaultSecurityManager {

View File

@ -178,29 +178,6 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
/**
* Stop scheduler service.
*/
fun stop() {
mutex.locked {
schedulerTimerExecutor.shutdown()
scheduledStatesQueue.clear()
scheduledStates.clear()
}
}
/**
* Resume scheduler service after having called [stop].
*/
fun resume() {
mutex.locked {
schedulerTimerExecutor = Executors.newSingleThreadExecutor()
scheduledStates.putAll(createMap())
scheduledStatesQueue.addAll(scheduledStates.values)
rescheduleWakeUp()
}
}
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
val previousState = scheduledStates[action.ref]
@ -240,7 +217,7 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
private var schedulerTimerExecutor = Executors.newSingleThreadExecutor()
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
/**
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
* [awaitWithDeadline] used below drops through without running the action. We then create a new

View File

@ -5,7 +5,6 @@ import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
@ -333,8 +332,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val shutdownLatch = CountDownLatch(1)
var runningFuture = openFuture<Unit>()
/**
* Starts the p2p event loop: this method only returns once [stop] has been called.
*/
@ -345,7 +342,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
running = true
runningFuture.set(Unit)
// If it's null, it means we already called stop, so return immediately.
if (p2pConsumer == null) {
return
@ -457,7 +453,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started)
val prevRunning = running
running = false
runningFuture = openFuture()
networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, { "stop can't be called twice" })
require(producer != null, { "stop can't be called twice" })

View File

@ -134,20 +134,6 @@ class SingleThreadedStateMachineManager(
}
}
override fun resume() {
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
val fibers = restoreFlowsFromCheckpoints()
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
serviceHub.networkMapCache.nodeReady.then {
resumeRestoredFlows(fibers)
}
mutex.locked {
stopping = false
}
}
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return mutex.locked {
flows.values.mapNotNull {

View File

@ -39,11 +39,6 @@ interface StateMachineManager {
*/
fun stop(allowedUnsuspendedFiberCount: Int)
/**
* Resume state machine manager after having called [stop].
*/
fun resume()
/**
* Starts a new flow.
*

View File

@ -89,7 +89,6 @@ class FiberDeserializationChecker {
fun stop(): Boolean {
jobQueue.add(Job.Finish)
checkerThread?.join()
checkerThread = null
return foundUnrestorableFibers
}
}