CORDA-3565: Port ServiceStateSupport from ENT to OS (#5916)

* CORDA-3565: `ServiceStateSupport` and supporting classes

* CORDA-3565:Plug `ServiceLifecycleSupport` into `MessagingService`

* CORDA-3565: Detekt baseline update

* CORDA-3565: React to MessagingServer going up and addition logging for up/down

Co-authored-by: Matthew Nesbit <matthew.nesbit@r3.com>
This commit is contained in:
Viktor Kolomeyko 2020-02-03 09:47:12 +00:00 committed by GitHub
parent 0c9b41591f
commit 90df56c173
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 167 additions and 36 deletions

View File

@ -1715,7 +1715,7 @@
<ID>TooManyFunctions:LedgerTransaction.kt$LedgerTransaction : FullTransaction</ID>
<ID>TooManyFunctions:LocalSerializerFactory.kt$DefaultLocalSerializerFactory : LocalSerializerFactory</ID>
<ID>TooManyFunctions:LocalTypeInformationBuilder.kt$LocalTypeInformationBuilder</ID>
<ID>TooManyFunctions:MockNodeMessagingService.kt$MockNodeMessagingService : SingletonSerializeAsTokenMessagingService</ID>
<ID>TooManyFunctions:MockNodeMessagingService.kt$MockNodeMessagingService : SingletonSerializeAsTokenMessagingServiceServiceStateSupport</ID>
<ID>TooManyFunctions:NetworkBootstrapper.kt$NetworkBootstrapper : NetworkBootstrapperWithOverridableParameters</ID>
<ID>TooManyFunctions:Node.kt$Node : AbstractNode</ID>
<ID>TooManyFunctions:NodeAttachmentService.kt$NodeAttachmentService : AttachmentStorageInternalSingletonSerializeAsToken</ID>
@ -1724,7 +1724,7 @@
<ID>TooManyFunctions:NodeVaultService.kt$NodeVaultService : SingletonSerializeAsTokenVaultServiceInternal</ID>
<ID>TooManyFunctions:OGSwapPricingExample.kt$SwapPricingExample</ID>
<ID>TooManyFunctions:ObservableUtilities.kt$net.corda.client.jfx.utils.ObservableUtilities.kt</ID>
<ID>TooManyFunctions:P2PMessagingClient.kt$P2PMessagingClient : SingletonSerializeAsTokenMessagingServiceAddressToArtemisQueueResolver</ID>
<ID>TooManyFunctions:P2PMessagingClient.kt$P2PMessagingClient : SingletonSerializeAsTokenMessagingServiceAddressToArtemisQueueResolverServiceStateSupport</ID>
<ID>TooManyFunctions:PathUtils.kt$net.corda.core.internal.PathUtils.kt</ID>
<ID>TooManyFunctions:Perceivable.kt$net.corda.finance.contracts.universal.Perceivable.kt</ID>
<ID>TooManyFunctions:PersistentIdentityService.kt$PersistentIdentityService : SingletonSerializeAsTokenIdentityServiceInternal</ID>

View File

@ -0,0 +1,30 @@
package net.corda.nodeapi.internal.lifecycle
import java.util.concurrent.ConcurrentHashMap
import javax.json.Json
object LifecycleStatusHelper {
private val serviceStatusMap = ConcurrentHashMap<String, Boolean>()
fun setServiceStatus(serviceName: String, active: Boolean) {
serviceStatusMap[serviceName] = active
}
fun getServiceStatus(serviceName: String) = serviceStatusMap.getOrDefault(serviceName, false)
/**
* Return a string copy of a JSON object containing the status of each service,
* and whether this bridge is the master.
*/
fun getServicesStatusReport(isMaster: Boolean): String {
return Json.createObjectBuilder().apply {
val statusList = Json.createArrayBuilder().apply {
serviceStatusMap.forEach { name: String, status: Boolean ->
add(Json.createObjectBuilder().add(name, status).build())
}
}
add("master", isMaster)
add("services", statusList)
}.build().toString()
}
}

View File

@ -0,0 +1,41 @@
package net.corda.nodeapi.internal.lifecycle
import rx.Observable
/**
* Basic interface to represent the dynamic life cycles of services that may be running, but may have to await external dependencies.
* Implementations of this should be implemented in a thread safe fashion.
*/
interface ServiceStateSupport {
/**
* Reads the current dynamic status of the service, which should only become true after the service has been started,
* any dynamic resources have been started/registered and any network connections have been completed.
* Failure to acquire a resource, or manual stop of the service, should return this to false.
*/
val active: Boolean
/**
* This Observer signals changes in the [active] variable, it should not be triggered for events that don't flip the [active] state.
*/
val activeChange: Observable<Boolean>
}
/**
* Simple interface for generic start/stop service lifecycle and the [active] flag indicating runtime ready state.
*/
interface ServiceLifecycleSupport : ServiceStateSupport, AutoCloseable {
/**
* Manual call to allow the service to start the process towards becoming active.
* Note wiring up service dependencies should happen in the constructor phase, unless this is to avoid a circular reference.
* Also, resources allocated as a result of start should be cleaned up as much as possible by stop.
* The [start] method should allow multiple reuse, assuming a [stop] call was made to clear the state.
*/
fun start()
/**
* Release the resources created by [start] and drops the [active] state to false.
*/
fun stop()
override fun close() = stop()
}

View File

@ -0,0 +1,47 @@
package net.corda.nodeapi.internal.lifecycle
import org.slf4j.Logger
import rx.Observable
import rx.subjects.BehaviorSubject
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* Simple implementation of [ServiceStateSupport] service domino logic using RxObservables.
*/
class ServiceStateHelper(private val log: Logger, private val serviceName: String = log.name.split(".").last()) : ServiceStateSupport {
private val lock = ReentrantLock()
// Volatile to prevent deadlocks when locking on read.
@Volatile
private var _active: Boolean = false
override var active: Boolean
get() = _active
set(value) {
lock.withLock {
if (value != _active) {
_active = value
log.info("Status change to $value")
LifecycleStatusHelper.setServiceStatus(serviceName, value)
_activeChange.onNext(value)
}
}
}
private val _activeChange: BehaviorSubject<Boolean> = BehaviorSubject.create<Boolean>(false)
private val _threadSafeObservable: Observable<Boolean> = _activeChange.serialize().distinctUntilChanged()
override val activeChange: Observable<Boolean>
get() = _threadSafeObservable
}
/**
* Simple implementation of [ServiceStateSupport] where it only reports [active] true when a set of dependencies are all [active] true.
*/
class ServiceStateCombiner(val services: List<ServiceStateSupport>) : ServiceStateSupport {
override val active: Boolean
get() = services.all { it.active }
private val _activeChange = Observable.combineLatest(services.map { it.activeChange }, { x -> x.all { y -> y as Boolean } }).serialize().distinctUntilChanged()
override val activeChange: Observable<Boolean>
get() = _activeChange
}

View File

@ -212,7 +212,7 @@ class ArtemisMessagingTest {
startNodeMessagingClient(maxMessageSize = clientMaxMessageSize)
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
thread(isDaemon = true) { messagingClient.run() }
thread(isDaemon = true) { messagingClient.start() }
return Pair(messagingClient, receivedMessages)
}

View File

@ -57,6 +57,7 @@ import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.toFuture
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.days
@ -316,7 +317,13 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val contractUpgradeService = ContractUpgradeServiceImpl(cacheFactory).tokenize()
val auditService = DummyAuditService().tokenize()
@Suppress("LeakingThis")
protected val network: MessagingService = makeMessagingService().tokenize()
protected val network: MessagingService = makeMessagingService().tokenize().apply {
activeChange.subscribe({
log.info("MessagingService active change to: $it")
}, {
log.warn("MessagingService subscription error", it)
})
}
val services = ServiceHubInternalImpl().tokenize()
@Suppress("LeakingThis")
val smm = makeStateMachineManager()
@ -535,16 +542,20 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val resultingNodeInfo = createStartedNode(nodeInfo, rpcOps, notaryService).also { _started = it }
val readyFuture = smmStartedFuture.flatMap {
log.debug("SMM ready")
network.ready
network.activeChange.filter { it }.toFuture()
}
resultingNodeInfo to readyFuture
}
readyFuture.map {
// NB: Dispatch lifecycle events outside of transaction to ensure attachments and the like persisted into the DB
log.debug("Distributing events")
nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.AfterNodeStart(nodeServicesContext))
nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.StateMachineStarted(nodeServicesContext))
readyFuture.map { ready ->
if (ready) {
// NB: Dispatch lifecycle events outside of transaction to ensure attachments and the like persisted into the DB
log.debug("Distributing events")
nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.AfterNodeStart(nodeServicesContext))
nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.StateMachineStarted(nodeServicesContext))
} else {
log.warn("Not distributing events as NetworkMap is not ready")
}
}
return resultingNodeInfo
}

View File

@ -634,7 +634,7 @@ open class Node(configuration: NodeConfiguration,
fun run() {
internalRpcMessagingClient?.start(rpcBroker!!.serverControl)
printBasicNodeInfo("Running P2PMessaging loop")
(network as P2PMessagingClient).run()
(network as P2PMessagingClient).start()
}
private var shutdown = false

View File

@ -13,6 +13,7 @@ import net.corda.core.utilities.ByteSequence
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.nodeapi.internal.lifecycle.ServiceLifecycleSupport
import java.time.Instant
import javax.annotation.concurrent.ThreadSafe
@ -27,7 +28,7 @@ import javax.annotation.concurrent.ThreadSafe
* is *reliable* and as such messages may be stored to disk once queued.
*/
@ThreadSafe
interface MessagingService : AutoCloseable {
interface MessagingService : ServiceLifecycleSupport {
/**
* A unique identifier for this sender that changes whenever a node restarts. This is used in conjunction with a sequence
* number for message de-duplication at the recipient.
@ -107,11 +108,6 @@ interface MessagingService : AutoCloseable {
/** Returns an address that refers to this node. */
val myAddress: SingleMessageRecipient
/**
* Signals when ready and fully operational
*/
val ready: CordaFuture<Void?>
}
fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: SenderDeduplicationId = SenderDeduplicationId(DeduplicationId.createRandom(newSecureRandom()), ourSenderUUID), additionalHeaders: Map<String, String> = emptyMap()) = send(createMessage(topicSession, payload.serialize().bytes, deduplicationId, additionalHeaders), to)

View File

@ -7,8 +7,6 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
@ -42,6 +40,8 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREF
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.lifecycle.ServiceStateHelper
import net.corda.nodeapi.internal.lifecycle.ServiceStateSupport
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.requireMessageSize
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
@ -92,8 +92,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val metricRegistry: MetricRegistry,
cacheFactory: NamedCacheFactory,
private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver {
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, ServiceStateSupport by stateHelper {
companion object {
private val log = contextLogger()
}
@ -140,13 +141,11 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val delayStartQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val handlers = ConcurrentHashMap<String, MessageHandler>()
private val handlersChangedSignal = java.lang.Object()
private val handlersChangedSignal = Object()
private val deduplicator = P2PMessageDeduplicator(cacheFactory, database)
internal var messagingExecutor: MessagingExecutor? = null
override val ready: OpenFuture<Void?> = openFuture()
/**
* @param myIdentity The primary identity of the node, which defines the messaging address for externally received messages.
* It is also used to construct the myAddress field, which is ultimately advertised in the network map.
@ -332,7 +331,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
/**
* Starts the p2p event loop: this method only returns once [stop] has been called.
*/
fun run() {
override fun start() {
val latch = CountDownLatch(1)
try {
synchronized(handlersChangedSignal) {
@ -355,8 +354,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
p2pConsumer!!
}
consumer.start()
log.debug("Signalling ready")
ready.set(null)
log.debug("Signalling active")
stateHelper.active = true
log.debug("Awaiting on latch")
latch.await()
} finally {
@ -456,12 +455,13 @@ class P2PMessagingClient(val config: NodeConfiguration,
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
* it returns immediately and shutdown is asynchronous.
*/
fun stop() {
override fun stop() {
val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started.
check(started)
val prevRunning = running
running = false
stateHelper.active = false
networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, { "stop can't be called twice" })
require(producer != null, { "stop can't be called twice" })
@ -504,8 +504,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
}
}
override fun close() = stop()
@Suspendable
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
requireMessageSize(message.data.size, maxMessageSize)

View File

@ -5,8 +5,6 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken
@ -19,6 +17,8 @@ import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.lifecycle.ServiceStateHelper
import net.corda.nodeapi.internal.lifecycle.ServiceStateSupport
import net.corda.testing.node.InMemoryMessagingNetwork
import java.time.Instant
import java.util.*
@ -28,7 +28,9 @@ import kotlin.concurrent.thread
@ThreadSafe
class MockNodeMessagingService(private val configuration: NodeConfiguration,
private val executor: AffinityExecutor) : SingletonSerializeAsToken(), MessagingService {
private val executor: AffinityExecutor,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : SingletonSerializeAsToken(),
MessagingService, ServiceStateSupport by stateHelper {
private companion object {
private val log = contextLogger()
}
@ -38,8 +40,6 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
@Volatile
private var running = true
override val ready: OpenFuture<Void?> = openFuture()
private inner class InnerState {
val handlers: MutableList<Handler> = ArrayList()
val pendingRedelivery = LinkedHashSet<InMemoryMessagingNetwork.MessageTransfer>()
@ -58,6 +58,14 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
var spy: MessagingServiceSpy? = null
override fun start() {
throw IllegalAccessException()
}
override fun stop() {
throw IllegalAccessException()
}
/**
* @param manuallyPumped if set to true, then you are expected to call [MockNodeMessagingService.pumpReceive]
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
@ -89,7 +97,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
}
network.addNotaryIdentity(this, notaryService)
ready.set(null)
stateHelper.active = true
}
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {