Convert long lived services supporting protocol listeners to plugins

This commit is contained in:
Matthew Nesbit
2016-07-28 11:58:35 +01:00
parent 454f555728
commit 235497e0f4
17 changed files with 257 additions and 192 deletions

View File

@ -17,6 +17,7 @@ import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.node.api.APIServer
@ -60,7 +61,7 @@ import java.util.*
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val networkMapService: NodeInfo?,
val advertisedServices: Set<ServiceType>, val platformClock: Clock) {
val advertisedServices: Set<ServiceType>, val platformClock: Clock): SingletonSerializeAsToken() {
companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
@ -124,6 +125,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
lateinit var api: APIServer
lateinit var scheduler: SchedulerService
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
var customServices: List<Any> = emptyList()
inline fun <reified T: Any>getCustomService() : T {
return customServices.single{ x-> x is T } as T
}
var isPreviousCheckpointsPresent = false
private set
@ -151,7 +157,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
net = makeMessagingService()
netMapCache = InMemoryNetworkMapCache(net)
wallet = NodeWalletService(services)
makeInterestRatesOracleService()
identity = makeIdentityService()
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
@ -159,17 +165,18 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
keyManagement = E2ETestKeyManagementService(setOf(storage.myLegalIdentityKey))
api = APIServerImpl(this)
scheduler = NodeSchedulerService(services)
smm = StateMachineManager(services,
listOf(storage, net, wallet, keyManagement, identity, platformClock, scheduler, interestRatesService),
checkpointStorage,
serverThread)
protocolLogicFactory = initialiseProtocolLogicFactory()
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
DataVendingService(net, services)
NotaryChangeService(net, smm, services.networkMapCache)
val tokenizableServices = mutableListOf(storage, net, wallet, keyManagement, identity, platformClock, scheduler)
buildPluginServices(tokenizableServices)
smm = StateMachineManager(services,
listOf(tokenizableServices),
checkpointStorage,
serverThread)
buildAdvertisedServices()
@ -199,6 +206,20 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
return ProtocolLogicRefFactory(protocolWhitelist)
}
private fun buildPluginServices(tokenizableServices: MutableList<Any>) {
val pluginServices = pluginRegistries.flatMap { x -> x.servicePlugins }
val serviceList = mutableListOf<Any>()
for (serviceClass in pluginServices) {
val service = serviceClass.getConstructor(ServiceHubInternal::class.java).newInstance(services)
serviceList.add(service)
tokenizableServices.add(service)
if(service is AcceptsFileUpload) {
_servicesThatAcceptUploads += service
}
}
customServices = serviceList
}
/**
* Run any tasks that are needed to ensure the node is in a correct state before running start().
@ -280,14 +301,6 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
}
}
lateinit var interestRatesService: NodeInterestRates.Service
open protected fun makeInterestRatesOracleService() {
// TODO: Once the service has data, automatically register with the network map service (once built).
interestRatesService = NodeInterestRates.Service(this)
_servicesThatAcceptUploads += interestRatesService
}
protected open fun makeIdentityService(): IdentityService {
val service = InMemoryIdentityService()
if (networkMapService != null)

View File

@ -15,7 +15,6 @@ import com.r3corda.core.node.services.linearHeadsOfType
import com.r3corda.core.node.services.testing.MockIdentityService
import com.r3corda.core.random63BitValue
import com.r3corda.core.success
import com.r3corda.node.services.FixingSessionInitiationHandler
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.utilities.JsonSupport
import com.r3corda.protocols.TwoPartyDealProtocol
@ -40,11 +39,6 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
override fun startMainSimulation(): ListenableFuture<Unit> {
// TODO: until we have general session initiation
FixingSessionInitiationHandler.register(banks[0])
FixingSessionInitiationHandler.register(banks[1])
val future = SettableFuture.create<Unit>()
nodeAKey = banks[0].keyManagement.freshKey()

View File

@ -115,9 +115,10 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
}
return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id, keyPair) {
override fun makeInterestRatesOracleService() {
super.makeInterestRatesOracleService()
interestRatesService.upload(javaClass.getResourceAsStream("example.rates.txt"))
override fun start(): MockNetwork.MockNode {
super.start()
getCustomService<NodeInterestRates.Service>().upload(javaClass.getResourceAsStream("example.rates.txt"))
return this
}
}
}

View File

@ -1,22 +0,0 @@
package com.r3corda.node.services
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.internal.AbstractNode
import com.r3corda.protocols.TwoPartyDealProtocol
/**
* This is a temporary handler required for establishing random sessionIDs for the [Fixer] and [Floater] as part of
* running scheduled fixings for the [InterestRateSwap] contract.
*
* TODO: This will be replaced with the automatic sessionID / session setup work.
*/
object FixingSessionInitiationHandler {
fun register(node: AbstractNode) {
node.net.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration ->
val initiation = msg.data.deserialize<TwoPartyDealProtocol.FixingSessionInitiation>()
val protocol = TwoPartyDealProtocol.Fixer(initiation)
node.smm.add("fixings", protocol)
}
}
}

View File

@ -1,30 +1,38 @@
package com.r3corda.node.services
import com.r3corda.core.messaging.Ack
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.AbstractStateReplacementProtocol
import com.r3corda.protocols.NotaryChangeProtocol
/**
* A service that monitors the network for requests for changing the notary of a state,
* and immediately runs the [NotaryChangeProtocol] if the auto-accept criteria are met.
*/
class NotaryChangeService(net: MessagingService, val smm: StateMachineManager, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) {
init {
addMessageHandler(NotaryChangeProtocol.TOPIC,
{ req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) }
)
object NotaryChange {
class Plugin : CordaPluginRegistry {
override val webApis: List<Class<*>> = emptyList()
override val requiredProtocols: Map<String, Set<String>> = emptyMap()
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
}
private fun handleChangeNotaryRequest(req: AbstractStateReplacementProtocol.Handshake): Ack {
val protocol = NotaryChangeProtocol.Acceptor(
req.replyToParty,
req.sessionID,
req.sessionIdForSend)
smm.add(NotaryChangeProtocol.TOPIC, protocol)
return Ack
/**
* A service that monitors the network for requests for changing the notary of a state,
* and immediately runs the [NotaryChangeProtocol] if the auto-accept criteria are met.
*/
class Service(val services: ServiceHubInternal) : AbstractNodeService(services.networkService, services.networkMapCache) {
init {
addMessageHandler(NotaryChangeProtocol.TOPIC,
{ req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) }
)
}
private fun handleChangeNotaryRequest(req: AbstractStateReplacementProtocol.Handshake): Ack {
val protocol = NotaryChangeProtocol.Acceptor(
req.replyToParty,
req.sessionID,
req.sessionIdForSend)
services.startProtocol(NotaryChangeProtocol.TOPIC, protocol)
return Ack
}
}
}

View File

@ -0,0 +1,31 @@
package com.r3corda.node.services.clientapi
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.TwoPartyDealProtocol
/**
* This is a temporary handler required for establishing random sessionIDs for the [Fixer] and [Floater] as part of
* running scheduled fixings for the [InterestRateSwap] contract.
*
* TODO: This will be replaced with the automatic sessionID / session setup work.
*/
object FixingSessionInitiation {
class Plugin: CordaPluginRegistry {
override val webApis: List<Class<*>> = emptyList()
override val requiredProtocols: Map<String, Set<String>> = emptyMap()
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
}
class Service(services: ServiceHubInternal) {
init {
services.networkService.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration ->
val initiation = msg.data.deserialize<TwoPartyDealProtocol.FixingSessionInitiation>()
val protocol = TwoPartyDealProtocol.Fixer(initiation)
services.startProtocol("fixings", protocol)
}
}
}
}

View File

@ -13,9 +13,9 @@ import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.AcceptsFileUpload
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.utilities.FiberBox
import com.r3corda.protocols.RatesFixProtocol
import com.r3corda.protocols.ServiceRequestMessage
@ -42,12 +42,23 @@ import javax.annotation.concurrent.ThreadSafe
*/
object NodeInterestRates {
object Type : ServiceType("corda.interest_rates")
/**
* Register the protocol that is used with the Fixing integration tests.
*/
class Plugin : CordaPluginRegistry {
override val webApis: List<Class<*>> = emptyList()
override val requiredProtocols: Map<String, Set<String>> = mapOf(Pair(TwoPartyDealProtocol.FixingRoleDecider::class.java.name, setOf(Duration::class.java.name, StateRef::class.java.name)))
override val servicePlugins: List<Class<*>> = listOf(NodeInterestRates.Service::class.java)
override val staticServeDirs: Map<String, String> = emptyMap()
}
/**
* The Service that wraps [Oracle] and handles messages/network interaction/request scrubbing.
*/
class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService, node.services.networkMapCache) {
val ss = node.services.storageService
val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, node.services.clock)
class Service(services: ServiceHubInternal) : AcceptsFileUpload, AbstractNodeService(services.networkService, services.networkMapCache) {
val ss = services.storageService
val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, services.clock)
private val logger = LoggerFactory.getLogger(Service::class.java)
@ -65,7 +76,7 @@ object NodeInterestRates {
* Interest rates become available when they are uploaded via the web as per [DataUploadServlet],
* if they haven't already been uploaded that way.
*/
node.smm.add("fixing", FixQueryHandler(this, req as RatesFixProtocol.QueryRequest))
services.startProtocol("fixing", FixQueryHandler(this, req as RatesFixProtocol.QueryRequest))
Unit
}
},
@ -96,15 +107,6 @@ object NodeInterestRates {
}
}
/**
* Register the protocol that is used with the Fixing integration tests.
*/
class FixingServicePlugin : CordaPluginRegistry {
override val webApis: List<Class<*>> = emptyList()
override val requiredProtocols: Map<String, Set<String>> = mapOf(Pair(TwoPartyDealProtocol.FixingRoleDecider::class.java.name, setOf(Duration::class.java.name, StateRef::class.java.name)))
override val staticServeDirs: Map<String, String> = emptyMap()
}
// File upload support
override val dataTypePrefix = "interest-rates"
override val acceptableFileExtensions = listOf(".rates", ".txt")

View File

@ -4,104 +4,110 @@ import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.failure
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.serialization.serialize
import com.r3corda.core.success
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.FetchAttachmentsProtocol
import com.r3corda.protocols.FetchDataProtocol
import com.r3corda.protocols.FetchTransactionsProtocol
import com.r3corda.protocols.PartyRequestMessage
import com.r3corda.protocols.ResolveTransactionsProtocol
import com.r3corda.protocols.*
import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe
/**
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
* glue that sits between the network layer and the database layer.
*
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
* such the hash of a piece of data can be seen as a type of password allowing access to it.
*
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
*/
@ThreadSafe
// TODO: I don't like that this needs ServiceHubInternal, but passing in a state machine breaks MockServices because
object DataVending {
class Plugin : CordaPluginRegistry {
override val webApis: List<Class<*>> = emptyList()
override val requiredProtocols: Map<String, Set<String>> = emptyMap()
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
}
/**
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
* glue that sits between the network layer and the database layer.
*
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
* such the hash of a piece of data can be seen as a type of password allowing access to it.
*
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
*/
@ThreadSafe
// TODO: I don't like that this needs ServiceHubInternal, but passing in a state machine breaks MockServices because
// the state machine isn't set when this is constructed. [NodeSchedulerService] has the same problem, and both
// should be fixed at the same time.
class DataVendingService(net: MessagingService, private val services: ServiceHubInternal) : AbstractNodeService(net, services.networkMapCache) {
companion object {
val logger = loggerFor<DataVendingService>()
class Service(net: MessagingService, private val services: ServiceHubInternal) : AbstractNodeService(net, services.networkMapCache) {
companion object {
val logger = loggerFor<DataVending.Service>()
/** Topic for messages notifying a node of a new transaction */
val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx"
}
val storage = services.storageService
data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
data class NotifyTxResponseMessage(val accepted: Boolean)
init {
addMessageHandler(FetchTransactionsProtocol.TOPIC,
{ req: FetchDataProtocol.Request -> handleTXRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
addMessageHandler(FetchAttachmentsProtocol.TOPIC,
{ req: FetchDataProtocol.Request -> handleAttachmentRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
addMessageHandler(NOTIFY_TX_PROTOCOL_TOPIC,
{ req: NotifyTxRequestMessage -> handleTXNotification(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
}
private fun handleTXNotification(req: NotifyTxRequestMessage): Unit {
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure {
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
}
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {
require(req.hashes.isNotEmpty())
return req.hashes.map {
val tx = storage.validatedTransactions.getTransaction(it)
if (tx == null)
logger.info("Got request for unknown tx $it")
tx
/** Topic for messages notifying a node of a new transaction */
val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx"
}
}
private fun handleAttachmentRequest(req: FetchDataProtocol.Request): List<ByteArray?> {
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
require(req.hashes.isNotEmpty())
return req.hashes.map {
val jar: InputStream? = storage.attachments.openAttachment(it)?.open()
if (jar == null) {
logger.info("Got request for unknown attachment $it")
null
} else {
jar.readBytes()
val storage = services.storageService
data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
data class NotifyTxResponseMessage(val accepted: Boolean)
init {
addMessageHandler(FetchTransactionsProtocol.TOPIC,
{ req: FetchDataProtocol.Request -> handleTXRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
addMessageHandler(FetchAttachmentsProtocol.TOPIC,
{ req: FetchDataProtocol.Request -> handleAttachmentRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
addMessageHandler(NOTIFY_TX_PROTOCOL_TOPIC,
{ req: NotifyTxRequestMessage -> handleTXNotification(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
}
private fun handleTXNotification(req: NotifyTxRequestMessage): Unit {
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure {
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
}
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {
require(req.hashes.isNotEmpty())
return req.hashes.map {
val tx = storage.validatedTransactions.getTransaction(it)
if (tx == null)
logger.info("Got request for unknown tx $it")
tx
}
}
private fun handleAttachmentRequest(req: FetchDataProtocol.Request): List<ByteArray?> {
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
require(req.hashes.isNotEmpty())
return req.hashes.map {
val jar: InputStream? = storage.attachments.openAttachment(it)?.open()
if (jar == null) {
logger.info("Got request for unknown attachment $it")
null
} else {
jar.readBytes()
}
}
}
}
}
}

View File

@ -1,2 +1,5 @@
# Register a ServiceLoader service extending from com.r3corda.node.CordaPluginRegistry
com.r3corda.node.services.clientapi.NodeInterestRates$Service$FixingServicePlugin
com.r3corda.node.services.clientapi.FixingSessionInitiation$Plugin
com.r3corda.node.services.clientapi.NodeInterestRates$Plugin
com.r3corda.node.services.NotaryChange$Plugin
com.r3corda.node.services.persistence.DataVending$Plugin

View File

@ -14,7 +14,7 @@ import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.DataVendingService
import com.r3corda.node.services.persistence.DataVending
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.wallet.NodeWalletService
import java.time.Clock
@ -68,7 +68,7 @@ open class MockServices(
if (net != null && storage != null) {
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
// on the networking service, so that will keep it from being collected.
DataVendingService(net, this)
DataVending.Service(this)
}
}
}

View File

@ -105,7 +105,7 @@ class NodeInterestRatesTest {
fun network() {
val net = MockNetwork()
val (n1, n2) = net.createTwoNodes()
n2.interestRatesService.oracle.knownFixes = TEST_DATA
n2.getCustomService<NodeInterestRates.Service>().oracle.knownFixes = TEST_DATA
val tx = TransactionType.General.Builder()
val fixOf = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")