Replacing the two params of AbstractNodeService with single ServiceHubInternal

This commit is contained in:
Shams Asari 2016-09-13 11:02:39 +01:00
parent 53de66a23d
commit f314bab6c8
11 changed files with 37 additions and 61 deletions

View File

@ -338,15 +338,15 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
open protected fun makeNetworkMapService() {
val expires = platformClock.instant() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, Long.MAX_VALUE, AddOrRemove.ADD, expires)
inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache)
inNodeNetworkMapService = InMemoryNetworkMapService(services, reg)
}
open protected fun makeNotaryService(type: ServiceType): NotaryService {
val timestampChecker = TimestampChecker(platformClock, 30.seconds)
return when (type) {
SimpleNotaryService.Type -> SimpleNotaryService(smm, net, timestampChecker, uniquenessProvider!!, services.networkMapCache)
ValidatingNotaryService.Type -> ValidatingNotaryService(smm, net, timestampChecker, uniquenessProvider!!, services.networkMapCache)
SimpleNotaryService.Type -> SimpleNotaryService(services, timestampChecker, uniquenessProvider!!)
ValidatingNotaryService.Type -> ValidatingNotaryService(services, timestampChecker, uniquenessProvider!!)
else -> {
throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.")
}
@ -374,7 +374,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// TODO: sort out ordering of open & protected modifiers of functions in this class.
protected open fun makeWalletService(): WalletService = NodeWalletService(services)
protected open fun makeWalletMonitorService(): WalletMonitorService = WalletMonitorService(net, smm, services)
protected open fun makeWalletMonitorService(): WalletMonitorService = WalletMonitorService(services, smm)
open fun stop() {
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the

View File

@ -17,7 +17,7 @@ object NotaryChange {
* A service that monitors the network for requests for changing the notary of a state,
* and immediately runs the [NotaryChangeProtocol] if the auto-accept criteria are met.
*/
class Service(val services: ServiceHubInternal) : AbstractNodeService(services.networkService, services.networkMapCache) {
class Service(services: ServiceHubInternal) : AbstractNodeService(services) {
init {
addMessageHandler(NotaryChangeProtocol.TOPIC,
{ req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) }

View File

@ -1,9 +1,7 @@
package com.r3corda.node.services.api
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
@ -14,7 +12,9 @@ import javax.annotation.concurrent.ThreadSafe
* Abstract superclass for services that a node can host, which provides helper functions.
*/
@ThreadSafe
abstract class AbstractNodeService(val net: MessagingService, val networkMapCache: NetworkMapCache) : SingletonSerializeAsToken() {
abstract class AbstractNodeService(val services: ServiceHubInternal) : SingletonSerializeAsToken() {
val net: MessagingServiceInternal get() = services.networkService
/**
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
@ -36,7 +36,7 @@ abstract class AbstractNodeService(val net: MessagingService, val networkMapCach
// If the return type R is Unit, then do not send a response
if (response.javaClass != Unit.javaClass) {
val msg = net.createMessage(topic, request.sessionID, response.serialize().bits)
net.send(msg, request.getReplyTo(networkMapCache))
net.send(msg, request.getReplyTo(services.networkMapCache))
}
} catch(e: Exception) {
exceptionConsumer(message, e)

View File

@ -7,8 +7,6 @@ import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.toStringShort
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.Wallet
import com.r3corda.core.protocols.ProtocolLogic
@ -17,6 +15,7 @@ import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.TransactionBuilder
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.protocols.BroadcastTransactionProtocol
@ -39,8 +38,7 @@ import javax.annotation.concurrent.ThreadSafe
// TODO: Clients need to be able to indicate whether they support interactivity (no point in sending requests for input
// to a monitoring tool)
@ThreadSafe
class WalletMonitorService(net: MessagingService, val smm: StateMachineManager, val services: ServiceHub)
: AbstractNodeService(net, services.networkMapCache) {
class WalletMonitorService(services: ServiceHubInternal, val smm: StateMachineManager) : AbstractNodeService(services) {
companion object {
val REGISTER_TOPIC = "platform.wallet_monitor.register"
val DEREGISTER_TOPIC = "platform.wallet_monitor.deregister"
@ -149,8 +147,7 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
}
private fun notifyEvent(event: ServiceToClientEvent) = listeners.forEach { monitor ->
net.send(net.createMessage(IN_EVENT_TOPIC, monitor.sessionID, event.serialize().bits),
monitor.recipients)
net.send(net.createMessage(IN_EVENT_TOPIC, monitor.sessionID, event.serialize().bits), monitor.recipients)
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service

View File

@ -7,7 +7,6 @@ import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SignedData
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
@ -16,11 +15,12 @@ import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.protocols.ServiceRequestMessage
import kotlinx.support.jdk8.collections.compute
import org.slf4j.LoggerFactory
import java.security.PrivateKey
import java.security.SignatureException
import java.time.Instant
@ -60,7 +60,7 @@ interface NetworkMapService {
// Base topic for messages acknowledging pushed updates
val PUSH_ACK_PROTOCOL_TOPIC = "platform.network_map.push_ack"
val logger = LoggerFactory.getLogger(NetworkMapService::class.java)
val logger = loggerFor<NetworkMapService>()
}
val nodes: List<NodeInfo>
@ -82,8 +82,7 @@ interface NetworkMapService {
}
@ThreadSafe
class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, cache: NetworkMapCache) :
AbstractNetworkMapService(net, cache) {
class InMemoryNetworkMapService(services: ServiceHubInternal, home: NodeRegistration) : AbstractNetworkMapService(services) {
override val registeredNodes: MutableMap<Party, NodeRegistrationInfo> = ConcurrentHashMap()
override val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, LastAcknowledgeInfo>())
@ -100,7 +99,7 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, c
* subscriber clean up and is simpler to persist than the previous implementation based on a set of missing messages acks.
*/
@ThreadSafe
abstract class AbstractNetworkMapService(net: MessagingService, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net, cache) {
abstract class AbstractNetworkMapService(services: ServiceHubInternal) : NetworkMapService, AbstractNodeService(services) {
protected abstract val registeredNodes: MutableMap<Party, NodeRegistrationInfo>
// Map from subscriber address, to most recently acknowledged update map version.
@ -277,11 +276,11 @@ abstract class AbstractNetworkMapService(net: MessagingService, val cache: Netwo
when (change.type) {
AddOrRemove.ADD -> {
NetworkMapService.logger.info("Added node ${node.address} to network map")
cache.addNode(change.node)
services.networkMapCache.addNode(change.node)
}
AddOrRemove.REMOVE -> {
NetworkMapService.logger.info("Removed node ${node.address} from network map")
cache.removeNode(change.node)
services.networkMapCache.removeNode(change.node)
}
}

View File

@ -1,19 +1,15 @@
package com.r3corda.node.services.persistence
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.failure
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.success
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.ServiceHubInternal
@ -43,7 +39,7 @@ object DataVending {
// TODO: I don't like that this needs ServiceHubInternal, but passing in a state machine breaks MockServices because
// the state machine isn't set when this is constructed. [NodeSchedulerService] has the same problem, and both
// should be fixed at the same time.
class Service(val services: ServiceHubInternal) : AbstractNodeService(services.networkService, services.networkMapCache) {
class Service(services: ServiceHubInternal) : AbstractNodeService(services) {
companion object {
val logger = loggerFor<DataVending.Service>()

View File

@ -1,29 +1,25 @@
package com.r3corda.node.services.transactions
import com.r3corda.core.messaging.Ack
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.node.services.UniquenessProvider
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.NotaryProtocol
/**
* A Notary service acts as the final signer of a transaction ensuring two things:
* - The (optional) timestamp of the transaction is valid.
* - None of the referenced input states have previously been consumed by a transaction signed by this Notary
*
*O
* A transaction has to be signed by a Notary to be considered valid (except for output-only transactions without a timestamp).
*
* This is the base implementation that can be customised with specific Notary transaction commit protocol.
*/
abstract class NotaryService(val smm: StateMachineManager,
net: MessagingService,
abstract class NotaryService(services: ServiceHubInternal,
val timestampChecker: TimestampChecker,
val uniquenessProvider: UniquenessProvider,
networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) {
val uniquenessProvider: UniquenessProvider) : AbstractNodeService(services) {
// Do not specify this as an advertised service. Use a concrete implementation.
// TODO: We do not want a service type that cannot be used. Fix the type system abuse here.
object Type : ServiceType("corda.notary")
@ -46,7 +42,7 @@ abstract class NotaryService(val smm: StateMachineManager,
req.sendSessionID,
timestampChecker,
uniquenessProvider)
smm.add(NotaryProtocol.TOPIC, protocol)
services.startProtocol(NotaryProtocol.TOPIC, protocol)
return Ack
}
}

View File

@ -1,21 +1,16 @@
package com.r3corda.node.services.transactions
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.node.services.UniquenessProvider
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.NotaryProtocol
/** A simple Notary service that does not perform transaction validation */
class SimpleNotaryService(
smm: StateMachineManager,
net: MessagingService,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider,
networkMapCache: NetworkMapCache) : NotaryService(smm, net, timestampChecker, uniquenessProvider, networkMapCache) {
class SimpleNotaryService(services: ServiceHubInternal,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider) : NotaryService(services, timestampChecker, uniquenessProvider) {
object Type : ServiceType("corda.notary.simple")
override val logger = loggerFor<SimpleNotaryService>()

View File

@ -1,24 +1,18 @@
package com.r3corda.node.services.transactions
import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.node.services.UniquenessProvider
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.NotaryProtocol
import com.r3corda.protocols.ValidatingNotaryProtocol
/** A Notary service that validates the transaction chain of he submitted transaction before committing it */
class ValidatingNotaryService(
smm: StateMachineManager,
net: MessagingService,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider,
networkMapCache: NetworkMapCache
) : NotaryService(smm, net, timestampChecker, uniquenessProvider, networkMapCache) {
class ValidatingNotaryService(services: ServiceHubInternal,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider) : NotaryService(services, timestampChecker, uniquenessProvider) {
object Type : ServiceType("corda.notary.validating")
override val logger = loggerFor<ValidatingNotaryService>()

View File

@ -55,7 +55,7 @@ object NodeInterestRates {
/**
* The Service that wraps [Oracle] and handles messages/network interaction/request scrubbing.
*/
class Service(services: ServiceHubInternal) : AcceptsFileUpload, AbstractNodeService(services.networkService, services.networkMapCache) {
class Service(services: ServiceHubInternal) : AcceptsFileUpload, AbstractNodeService(services) {
val ss = services.storageService
val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, services.clock)

View File

@ -11,14 +11,13 @@ import com.r3corda.core.node.services.WalletService
import com.r3corda.core.testing.InMemoryWalletService
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
import com.r3corda.core.utilities.loggerFor
import com.r3corda.testing.node.TestTransactionManager
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.keys.E2ETestKeyManagementService
import com.r3corda.node.services.network.InMemoryNetworkMapService
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.network.NodeRegistration
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
import com.r3corda.node.utilities.AddOrRemove
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.slf4j.Logger
import java.nio.file.Files
@ -109,7 +108,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
override fun makeNetworkMapService() {
val expires = platformClock.instant() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, Long.MAX_VALUE, AddOrRemove.ADD, expires)
inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache)
inNodeNetworkMapService = InMemoryNetworkMapService(services, reg)
}
override fun generateKeyPair(): KeyPair = keyPair ?: super.generateKeyPair()