Merged in mnesbit-cor-199-more-plugins (pull request #249)

Mnesbit cor 199 more plugins
This commit is contained in:
Matthew Nesbit
2016-07-28 16:37:25 +01:00
18 changed files with 254 additions and 210 deletions

View File

@ -4,20 +4,20 @@ package com.r3corda.core.node
* Implement this interface on a class advertised in a META-INF/services/com.r3corda.core.node.CordaPluginRegistry file * Implement this interface on a class advertised in a META-INF/services/com.r3corda.core.node.CordaPluginRegistry file
* to extend a Corda node with additional application services. * to extend a Corda node with additional application services.
*/ */
interface CordaPluginRegistry { abstract class CordaPluginRegistry {
/** /**
* List of JAX-RS classes inside the contract jar. They are expected to have a single parameter constructor that takes a ServiceHub as input. * List of JAX-RS classes inside the contract jar. They are expected to have a single parameter constructor that takes a ServiceHub as input.
* These are listed as Class<*>, because they will be instantiated inside an AttachmentClassLoader so that subsequent protocols, contracts, etc * These are listed as Class<*>, because in the future they will be instantiated inside a ClassLoader so that
* will be running in the appropriate isolated context. * Cordapp code can be loaded dynamically.
*/ */
val webApis: List<Class<*>> open val webApis: List<Class<*>> = emptyList()
/** /**
* Map of static serving endpoints to the matching resource directory. All endpoints will be prefixed with "/web" and postfixed with "\*. * Map of static serving endpoints to the matching resource directory. All endpoints will be prefixed with "/web" and postfixed with "\*.
* Resource directories can be either on disk directories (especially when debugging) in the form "a/b/c". Serving from a JAR can * Resource directories can be either on disk directories (especially when debugging) in the form "a/b/c". Serving from a JAR can
* be specified with: javaClass.getResource("<folder-in-jar>").toExternalForm() * be specified with: javaClass.getResource("<folder-in-jar>").toExternalForm()
*/ */
val staticServeDirs: Map<String, String> open val staticServeDirs: Map<String, String> = emptyMap()
/** /**
* A Map with an entry for each consumed protocol used by the webAPIs. * A Map with an entry for each consumed protocol used by the webAPIs.
@ -26,5 +26,13 @@ interface CordaPluginRegistry {
* Standard java.lang.* and kotlin.* types do not need to be included explicitly. * Standard java.lang.* and kotlin.* types do not need to be included explicitly.
* This is used to extend the white listed protocols that can be initiated from the ServiceHub invokeProtocolAsync method. * This is used to extend the white listed protocols that can be initiated from the ServiceHub invokeProtocolAsync method.
*/ */
val requiredProtocols: Map<String, Set<String>> open val requiredProtocols: Map<String, Set<String>> = emptyMap()
/**
* List of additional long lived services to be hosted within the node.
* They are expected to have a single parameter constructor that takes a ServiceHubInternal as input.
* The ServiceHubInternal will be fully constructed before the plugin service is created and will
* allow access to the protocol factory and protocol initiation entry points there.
*/
open val servicePlugins: List<Class<*>> = emptyList()
} }

View File

@ -17,12 +17,11 @@ import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds import com.r3corda.core.seconds
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
import com.r3corda.node.api.APIServer import com.r3corda.node.api.APIServer
import com.r3corda.node.services.NotaryChangeService
import com.r3corda.node.services.api.* import com.r3corda.node.services.api.*
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.events.NodeSchedulerService import com.r3corda.node.services.events.NodeSchedulerService
import com.r3corda.node.services.events.ScheduledActivityObserver import com.r3corda.node.services.events.ScheduledActivityObserver
@ -60,7 +59,7 @@ import java.util.*
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the // In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in. // AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val networkMapService: NodeInfo?, abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val networkMapService: NodeInfo?,
val advertisedServices: Set<ServiceType>, val platformClock: Clock) { val advertisedServices: Set<ServiceType>, val platformClock: Clock): SingletonSerializeAsToken() {
companion object { companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key" val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public" val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
@ -124,6 +123,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
lateinit var api: APIServer lateinit var api: APIServer
lateinit var scheduler: SchedulerService lateinit var scheduler: SchedulerService
lateinit var protocolLogicFactory: ProtocolLogicRefFactory lateinit var protocolLogicFactory: ProtocolLogicRefFactory
val customServices: ArrayList<Any> = ArrayList()
/** Locates and returns a service of the given type if loaded, or throws an exception if not found. */
inline fun <reified T: Any> findService() = customServices.filterIsInstance<T>().single()
var isPreviousCheckpointsPresent = false var isPreviousCheckpointsPresent = false
private set private set
@ -151,7 +155,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
net = makeMessagingService() net = makeMessagingService()
netMapCache = InMemoryNetworkMapCache(net) netMapCache = InMemoryNetworkMapCache(net)
wallet = NodeWalletService(services) wallet = NodeWalletService(services)
makeInterestRatesOracleService()
identity = makeIdentityService() identity = makeIdentityService()
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because // Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with // the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
@ -159,17 +163,18 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
keyManagement = E2ETestKeyManagementService(setOf(storage.myLegalIdentityKey)) keyManagement = E2ETestKeyManagementService(setOf(storage.myLegalIdentityKey))
api = APIServerImpl(this) api = APIServerImpl(this)
scheduler = NodeSchedulerService(services) scheduler = NodeSchedulerService(services)
smm = StateMachineManager(services,
listOf(storage, net, wallet, keyManagement, identity, platformClock, scheduler, interestRatesService),
checkpointStorage,
serverThread)
protocolLogicFactory = initialiseProtocolLogicFactory() protocolLogicFactory = initialiseProtocolLogicFactory()
// This object doesn't need to be referenced from this class because it registers handlers on the network val tokenizableServices = mutableListOf(storage, net, wallet, keyManagement, identity, platformClock, scheduler)
// service and so that keeps it from being collected.
DataVendingService(net, services) customServices.clear()
NotaryChangeService(net, smm, services.networkMapCache) customServices.addAll(buildPluginServices(tokenizableServices))
smm = StateMachineManager(services,
listOf(tokenizableServices),
checkpointStorage,
serverThread)
buildAdvertisedServices() buildAdvertisedServices()
@ -199,6 +204,20 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
return ProtocolLogicRefFactory(protocolWhitelist) return ProtocolLogicRefFactory(protocolWhitelist)
} }
private fun buildPluginServices(tokenizableServices: MutableList<Any>): List<Any> {
val pluginServices = pluginRegistries.flatMap { x -> x.servicePlugins }
val serviceList = mutableListOf<Any>()
for (serviceClass in pluginServices) {
val service = serviceClass.getConstructor(ServiceHubInternal::class.java).newInstance(services)
serviceList.add(service)
tokenizableServices.add(service)
if(service is AcceptsFileUpload) {
_servicesThatAcceptUploads += service
}
}
return serviceList
}
/** /**
* Run any tasks that are needed to ensure the node is in a correct state before running start(). * Run any tasks that are needed to ensure the node is in a correct state before running start().
@ -280,14 +299,6 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
} }
} }
lateinit var interestRatesService: NodeInterestRates.Service
open protected fun makeInterestRatesOracleService() {
// TODO: Once the service has data, automatically register with the network map service (once built).
interestRatesService = NodeInterestRates.Service(this)
_servicesThatAcceptUploads += interestRatesService
}
protected open fun makeIdentityService(): IdentityService { protected open fun makeIdentityService(): IdentityService {
val service = InMemoryIdentityService() val service = InMemoryIdentityService()
if (networkMapService != null) if (networkMapService != null)

View File

@ -15,7 +15,6 @@ import com.r3corda.core.node.services.linearHeadsOfType
import com.r3corda.core.node.services.testing.MockIdentityService import com.r3corda.core.node.services.testing.MockIdentityService
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.core.success import com.r3corda.core.success
import com.r3corda.node.services.FixingSessionInitiationHandler
import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.utilities.JsonSupport import com.r3corda.node.utilities.JsonSupport
import com.r3corda.protocols.TwoPartyDealProtocol import com.r3corda.protocols.TwoPartyDealProtocol
@ -40,11 +39,6 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>()) private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
override fun startMainSimulation(): ListenableFuture<Unit> { override fun startMainSimulation(): ListenableFuture<Unit> {
// TODO: until we have general session initiation
FixingSessionInitiationHandler.register(banks[0])
FixingSessionInitiationHandler.register(banks[1])
val future = SettableFuture.create<Unit>() val future = SettableFuture.create<Unit>()
nodeAKey = banks[0].keyManagement.freshKey() nodeAKey = banks[0].keyManagement.freshKey()

View File

@ -115,9 +115,10 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
} }
return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id, keyPair) { return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id, keyPair) {
override fun makeInterestRatesOracleService() { override fun start(): MockNetwork.MockNode {
super.makeInterestRatesOracleService() super.start()
interestRatesService.upload(javaClass.getResourceAsStream("example.rates.txt")) findService<NodeInterestRates.Service>().upload(javaClass.getResourceAsStream("example.rates.txt"))
return this
} }
} }
} }

View File

@ -1,22 +0,0 @@
package com.r3corda.node.services
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.internal.AbstractNode
import com.r3corda.protocols.TwoPartyDealProtocol
/**
* This is a temporary handler required for establishing random sessionIDs for the [Fixer] and [Floater] as part of
* running scheduled fixings for the [InterestRateSwap] contract.
*
* TODO: This will be replaced with the automatic sessionID / session setup work.
*/
object FixingSessionInitiationHandler {
fun register(node: AbstractNode) {
node.net.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration ->
val initiation = msg.data.deserialize<TwoPartyDealProtocol.FixingSessionInitiation>()
val protocol = TwoPartyDealProtocol.Fixer(initiation)
node.smm.add("fixings", protocol)
}
}
}

View File

@ -1,30 +1,36 @@
package com.r3corda.node.services package com.r3corda.node.services
import com.r3corda.core.messaging.Ack import com.r3corda.core.messaging.Ack
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.AbstractStateReplacementProtocol import com.r3corda.protocols.AbstractStateReplacementProtocol
import com.r3corda.protocols.NotaryChangeProtocol import com.r3corda.protocols.NotaryChangeProtocol
/**
* A service that monitors the network for requests for changing the notary of a state, object NotaryChange {
* and immediately runs the [NotaryChangeProtocol] if the auto-accept criteria are met. class Plugin : CordaPluginRegistry() {
*/ override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
class NotaryChangeService(net: MessagingService, val smm: StateMachineManager, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) {
init {
addMessageHandler(NotaryChangeProtocol.TOPIC,
{ req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) }
)
} }
private fun handleChangeNotaryRequest(req: AbstractStateReplacementProtocol.Handshake): Ack { /**
val protocol = NotaryChangeProtocol.Acceptor( * A service that monitors the network for requests for changing the notary of a state,
req.replyToParty, * and immediately runs the [NotaryChangeProtocol] if the auto-accept criteria are met.
req.sessionID, */
req.sessionIdForSend) class Service(val services: ServiceHubInternal) : AbstractNodeService(services.networkService, services.networkMapCache) {
smm.add(NotaryChangeProtocol.TOPIC, protocol) init {
return Ack addMessageHandler(NotaryChangeProtocol.TOPIC,
{ req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) }
)
}
private fun handleChangeNotaryRequest(req: AbstractStateReplacementProtocol.Handshake): Ack {
val protocol = NotaryChangeProtocol.Acceptor(
req.replyToParty,
req.sessionID,
req.sessionIdForSend)
services.startProtocol(NotaryChangeProtocol.TOPIC, protocol)
return Ack
}
} }
} }

View File

@ -0,0 +1,29 @@
package com.r3corda.node.services.clientapi
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.TwoPartyDealProtocol
/**
* This is a temporary handler required for establishing random sessionIDs for the [Fixer] and [Floater] as part of
* running scheduled fixings for the [InterestRateSwap] contract.
*
* TODO: This will be replaced with the automatic sessionID / session setup work.
*/
object FixingSessionInitiation {
class Plugin: CordaPluginRegistry() {
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
}
class Service(services: ServiceHubInternal) {
init {
services.networkService.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration ->
val initiation = msg.data.deserialize<TwoPartyDealProtocol.FixingSessionInitiation>()
val protocol = TwoPartyDealProtocol.Fixer(initiation)
services.startProtocol("fixings", protocol)
}
}
}
}

View File

@ -13,9 +13,9 @@ import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.utilities.ProgressTracker import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.AcceptsFileUpload import com.r3corda.node.services.api.AcceptsFileUpload
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.utilities.FiberBox import com.r3corda.node.utilities.FiberBox
import com.r3corda.protocols.RatesFixProtocol import com.r3corda.protocols.RatesFixProtocol
import com.r3corda.protocols.ServiceRequestMessage import com.r3corda.protocols.ServiceRequestMessage
@ -42,12 +42,21 @@ import javax.annotation.concurrent.ThreadSafe
*/ */
object NodeInterestRates { object NodeInterestRates {
object Type : ServiceType("corda.interest_rates") object Type : ServiceType("corda.interest_rates")
/**
* Register the protocol that is used with the Fixing integration tests.
*/
class Plugin : CordaPluginRegistry() {
override val requiredProtocols: Map<String, Set<String>> = mapOf(Pair(TwoPartyDealProtocol.FixingRoleDecider::class.java.name, setOf(Duration::class.java.name, StateRef::class.java.name)))
override val servicePlugins: List<Class<*>> = listOf(NodeInterestRates.Service::class.java)
}
/** /**
* The Service that wraps [Oracle] and handles messages/network interaction/request scrubbing. * The Service that wraps [Oracle] and handles messages/network interaction/request scrubbing.
*/ */
class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService, node.services.networkMapCache) { class Service(services: ServiceHubInternal) : AcceptsFileUpload, AbstractNodeService(services.networkService, services.networkMapCache) {
val ss = node.services.storageService val ss = services.storageService
val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, node.services.clock) val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, services.clock)
private val logger = LoggerFactory.getLogger(Service::class.java) private val logger = LoggerFactory.getLogger(Service::class.java)
@ -65,7 +74,7 @@ object NodeInterestRates {
* Interest rates become available when they are uploaded via the web as per [DataUploadServlet], * Interest rates become available when they are uploaded via the web as per [DataUploadServlet],
* if they haven't already been uploaded that way. * if they haven't already been uploaded that way.
*/ */
node.smm.add("fixing", FixQueryHandler(this, req as RatesFixProtocol.QueryRequest)) services.startProtocol("fixing", FixQueryHandler(this, req as RatesFixProtocol.QueryRequest))
Unit Unit
} }
}, },
@ -96,15 +105,6 @@ object NodeInterestRates {
} }
} }
/**
* Register the protocol that is used with the Fixing integration tests.
*/
class FixingServicePlugin : CordaPluginRegistry {
override val webApis: List<Class<*>> = emptyList()
override val requiredProtocols: Map<String, Set<String>> = mapOf(Pair(TwoPartyDealProtocol.FixingRoleDecider::class.java.name, setOf(Duration::class.java.name, StateRef::class.java.name)))
override val staticServeDirs: Map<String, String> = emptyMap()
}
// File upload support // File upload support
override val dataTypePrefix = "interest-rates" override val dataTypePrefix = "interest-rates"
override val acceptableFileExtensions = listOf(".rates", ".txt") override val acceptableFileExtensions = listOf(".rates", ".txt")

View File

@ -4,104 +4,108 @@ import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.failure import com.r3corda.core.failure
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
import com.r3corda.core.success import com.r3corda.core.success
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.FetchAttachmentsProtocol import com.r3corda.protocols.*
import com.r3corda.protocols.FetchDataProtocol
import com.r3corda.protocols.FetchTransactionsProtocol
import com.r3corda.protocols.PartyRequestMessage
import com.r3corda.protocols.ResolveTransactionsProtocol
import java.io.InputStream import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
/** object DataVending {
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
* glue that sits between the network layer and the database layer. class Plugin : CordaPluginRegistry() {
* override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There }
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have /**
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as * This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
* such the hash of a piece of data can be seen as a type of password allowing access to it. * glue that sits between the network layer and the database layer.
* *
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null. * Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
*/ * are no access control lists. If you want to keep some data private, then you must be careful who you give its name
@ThreadSafe * to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
// TODO: I don't like that this needs ServiceHubInternal, but passing in a state machine breaks MockServices because * its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
* such the hash of a piece of data can be seen as a type of password allowing access to it.
*
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
*/
@ThreadSafe
// TODO: I don't like that this needs ServiceHubInternal, but passing in a state machine breaks MockServices because
// the state machine isn't set when this is constructed. [NodeSchedulerService] has the same problem, and both // the state machine isn't set when this is constructed. [NodeSchedulerService] has the same problem, and both
// should be fixed at the same time. // should be fixed at the same time.
class DataVendingService(net: MessagingService, private val services: ServiceHubInternal) : AbstractNodeService(net, services.networkMapCache) { class Service(val services: ServiceHubInternal) : AbstractNodeService(services.networkService, services.networkMapCache) {
companion object { companion object {
val logger = loggerFor<DataVendingService>() val logger = loggerFor<DataVending.Service>()
/** Topic for messages notifying a node of a new transaction */ /** Topic for messages notifying a node of a new transaction */
val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx" val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx"
}
val storage = services.storageService
data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
data class NotifyTxResponseMessage(val accepted: Boolean)
init {
addMessageHandler(FetchTransactionsProtocol.TOPIC,
{ req: FetchDataProtocol.Request -> handleTXRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
addMessageHandler(FetchAttachmentsProtocol.TOPIC,
{ req: FetchDataProtocol.Request -> handleAttachmentRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
addMessageHandler(NOTIFY_TX_PROTOCOL_TOPIC,
{ req: NotifyTxRequestMessage -> handleTXNotification(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
}
private fun handleTXNotification(req: NotifyTxRequestMessage): Unit {
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure {
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
}
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {
require(req.hashes.isNotEmpty())
return req.hashes.map {
val tx = storage.validatedTransactions.getTransaction(it)
if (tx == null)
logger.info("Got request for unknown tx $it")
tx
} }
}
private fun handleAttachmentRequest(req: FetchDataProtocol.Request): List<ByteArray?> { val storage = services.storageService
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
require(req.hashes.isNotEmpty()) data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
return req.hashes.map { data class NotifyTxResponseMessage(val accepted: Boolean)
val jar: InputStream? = storage.attachments.openAttachment(it)?.open()
if (jar == null) { init {
logger.info("Got request for unknown attachment $it") addMessageHandler(FetchTransactionsProtocol.TOPIC,
null { req: FetchDataProtocol.Request -> handleTXRequest(req) },
} else { { message, e -> logger.error("Failure processing data vending request.", e) }
jar.readBytes() )
addMessageHandler(FetchAttachmentsProtocol.TOPIC,
{ req: FetchDataProtocol.Request -> handleAttachmentRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
addMessageHandler(NOTIFY_TX_PROTOCOL_TOPIC,
{ req: NotifyTxRequestMessage -> handleTXNotification(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
}
private fun handleTXNotification(req: NotifyTxRequestMessage): Unit {
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure {
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC + "." + req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
}
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {
require(req.hashes.isNotEmpty())
return req.hashes.map {
val tx = storage.validatedTransactions.getTransaction(it)
if (tx == null)
logger.info("Got request for unknown tx $it")
tx
}
}
private fun handleAttachmentRequest(req: FetchDataProtocol.Request): List<ByteArray?> {
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
require(req.hashes.isNotEmpty())
return req.hashes.map {
val jar: InputStream? = storage.attachments.openAttachment(it)?.open()
if (jar == null) {
logger.info("Got request for unknown attachment $it")
null
} else {
jar.readBytes()
}
} }
} }
} }
} }

View File

@ -1,2 +1,5 @@
# Register a ServiceLoader service extending from com.r3corda.node.CordaPluginRegistry # Register a ServiceLoader service extending from com.r3corda.node.CordaPluginRegistry
com.r3corda.node.services.clientapi.NodeInterestRates$Service$FixingServicePlugin com.r3corda.node.services.clientapi.FixingSessionInitiation$Plugin
com.r3corda.node.services.clientapi.NodeInterestRates$Plugin
com.r3corda.node.services.NotaryChange$Plugin
com.r3corda.node.services.persistence.DataVending$Plugin

View File

@ -14,7 +14,7 @@ import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache import com.r3corda.node.services.network.MockNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.DataVendingService import com.r3corda.node.services.persistence.DataVending
import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.wallet.NodeWalletService import com.r3corda.node.services.wallet.NodeWalletService
import java.time.Clock import java.time.Clock
@ -68,7 +68,7 @@ open class MockServices(
if (net != null && storage != null) { if (net != null && storage != null) {
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener // Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
// on the networking service, so that will keep it from being collected. // on the networking service, so that will keep it from being collected.
DataVendingService(net, this) DataVending.Service(this)
} }
} }
} }

View File

@ -105,7 +105,7 @@ class NodeInterestRatesTest {
fun network() { fun network() {
val net = MockNetwork() val net = MockNetwork()
val (n1, n2) = net.createTwoNodes() val (n1, n2) = net.createTwoNodes()
n2.interestRatesService.oracle.knownFixes = TEST_DATA n2.findService<NodeInterestRates.Service>().oracle.knownFixes = TEST_DATA
val tx = TransactionType.General.Builder() val tx = TransactionType.General.Builder()
val fixOf = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M") val fixOf = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")

View File

@ -2,23 +2,19 @@ package com.r3corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.r3corda.contracts.asset.Cash import com.r3corda.contracts.asset.Cash
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
import com.r3corda.core.contracts.* import com.r3corda.core.contracts.*
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.core.testing.DUMMY_NOTARY import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.MEGA_CORP import com.r3corda.core.testing.MEGA_CORP
import com.r3corda.core.testing.MEGA_CORP_KEY
import com.r3corda.core.utilities.BriefLogFormatter import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.node.internal.testing.MockNetwork import com.r3corda.node.internal.testing.MockNetwork
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import javax.annotation.Signed
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFalse import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue import kotlin.test.assertTrue
/** /**
@ -38,12 +34,12 @@ class DataVendingServiceTests {
class NotifyPSM(val server: NodeInfo, val tx: SignedTransaction) class NotifyPSM(val server: NodeInfo, val tx: SignedTransaction)
: ProtocolLogic<Boolean>() { : ProtocolLogic<Boolean>() {
override val topic: String get() = DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC override val topic: String get() = DataVending.Service.NOTIFY_TX_PROTOCOL_TOPIC
@Suspendable @Suspendable
override fun call(): Boolean { override fun call(): Boolean {
val sessionID = random63BitValue() val sessionID = random63BitValue()
val req = DataVendingService.NotifyTxRequestMessage(tx, serviceHub.storageService.myLegalIdentity, sessionID) val req = DataVending.Service.NotifyTxRequestMessage(tx, serviceHub.storageService.myLegalIdentity, sessionID)
return sendAndReceive<DataVendingService.NotifyTxResponseMessage>(server.identity, 0, sessionID, req).validate { it.accepted } return sendAndReceive<DataVending.Service.NotifyTxResponseMessage>(server.identity, 0, sessionID, req).validate { it.accepted }
} }
} }
@ -62,7 +58,7 @@ class DataVendingServiceTests {
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
val tx = ptx.toSignedTransaction() val tx = ptx.toSignedTransaction()
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
val notifyPsm = registerNode.smm.add(DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx)) val notifyPsm = registerNode.smm.add(DataVending.Service.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx))
// Check it was accepted // Check it was accepted
network.runNetwork() network.runNetwork()
@ -93,7 +89,7 @@ class DataVendingServiceTests {
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
val tx = ptx.toSignedTransaction(false) val tx = ptx.toSignedTransaction(false)
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size) assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.size)
val notifyPsm = registerNode.smm.add(DataVendingService.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx)) val notifyPsm = registerNode.smm.add(DataVending.Service.NOTIFY_TX_PROTOCOL_TOPIC, NotifyPSM(walletServiceNode.info, tx))
// Check it was accepted // Check it was accepted
network.runNetwork() network.runNetwork()

View File

@ -17,7 +17,6 @@ import com.r3corda.demos.protocols.UpdateBusinessDayProtocol
import com.r3corda.node.internal.AbstractNode import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.internal.testing.MockNetwork import com.r3corda.node.internal.testing.MockNetwork
import com.r3corda.node.services.FixingSessionInitiationHandler
import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
@ -258,7 +257,7 @@ object CliParamsSpec {
val help = parser.accepts("help", "Prints this help").forHelp() val help = parser.accepts("help", "Prints this help").forHelp()
} }
class IRSDemoPluginRegistry : CordaPluginRegistry { class IRSDemoPluginRegistry : CordaPluginRegistry() {
override val webApis: List<Class<*>> = listOf(InterestRateSwapAPI::class.java) override val webApis: List<Class<*>> = listOf(InterestRateSwapAPI::class.java)
override val staticServeDirs: Map<String, String> = mapOf("irsdemo" to javaClass.getResource("irswebdemo").toExternalForm()) override val staticServeDirs: Map<String, String> = mapOf("irsdemo" to javaClass.getResource("irswebdemo").toExternalForm())
override val requiredProtocols: Map<String, Set<String>> = mapOf( override val requiredProtocols: Map<String, Set<String>> = mapOf(
@ -332,11 +331,6 @@ private fun runNode(cliParams: CliParams.RunNode): Int {
val networkMap = createRecipient(cliParams.mapAddress) val networkMap = createRecipient(cliParams.mapAddress)
val node = startNode(cliParams, networkMap) val node = startNode(cliParams, networkMap)
// Register handlers for the demo
AutoOfferProtocol.Handler.register(node)
UpdateBusinessDayProtocol.Handler.register(node)
ExitServerProtocol.Handler.register(node)
FixingSessionInitiationHandler.register(node)
if (cliParams.uploadRates) { if (cliParams.uploadRates) {
runUploadRates(cliParams.apiAddress) runUploadRates(cliParams.apiAddress)

View File

@ -6,11 +6,12 @@ import com.google.common.util.concurrent.Futures
import com.r3corda.core.contracts.DealState import com.r3corda.core.contracts.DealState
import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.ProgressTracker import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.Node import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.protocols.TwoPartyDealProtocol import com.r3corda.protocols.TwoPartyDealProtocol
/** /**
@ -27,7 +28,12 @@ object AutoOfferProtocol {
val notary: Party, val notary: Party,
val otherSessionID: Long, val dealBeingOffered: DealState) val otherSessionID: Long, val dealBeingOffered: DealState)
object Handler { class Plugin: CordaPluginRegistry() {
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
}
class Service(services: ServiceHubInternal) {
object RECEIVED : ProgressTracker.Step("Received offer") object RECEIVED : ProgressTracker.Step("Received offer")
object DEALING : ProgressTracker.Step("Starting the deal protocol") { object DEALING : ProgressTracker.Step("Starting the deal protocol") {
@ -46,16 +52,16 @@ object AutoOfferProtocol {
} }
} }
fun register(node: Node) { init {
node.net.addMessageHandler("$TOPIC.0") { msg, registration -> services.networkService.addMessageHandler("$TOPIC.0") { msg, registration ->
val progressTracker = tracker() val progressTracker = tracker()
progressTracker.currentStep = RECEIVED progressTracker.currentStep = RECEIVED
val autoOfferMessage = msg.data.deserialize<AutoOfferMessage>() val autoOfferMessage = msg.data.deserialize<AutoOfferMessage>()
// Put the deal onto the ledger // Put the deal onto the ledger
progressTracker.currentStep = DEALING progressTracker.currentStep = DEALING
val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, autoOfferMessage.notary, val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, autoOfferMessage.notary,
autoOfferMessage.dealBeingOffered, node.services.keyManagementService.freshKey(), autoOfferMessage.otherSessionID, progressTracker.getChildProgressTracker(DEALING)!!) autoOfferMessage.dealBeingOffered, services.keyManagementService.freshKey(), autoOfferMessage.otherSessionID, progressTracker.getChildProgressTracker(DEALING)!!)
val future = node.smm.add("${TwoPartyDealProtocol.DEAL_TOPIC}.seller", seller) val future = services.startProtocol("${TwoPartyDealProtocol.DEAL_TOPIC}.seller", seller)
// This is required because we are doing child progress outside of a subprotocol. In future, we should just wrap things like this in a protocol to avoid it // This is required because we are doing child progress outside of a subprotocol. In future, we should just wrap things like this in a protocol to avoid it
Futures.addCallback(future, Callback() { Futures.addCallback(future, Callback() {
seller.progressTracker.currentStep = ProgressTracker.DONE seller.progressTracker.currentStep = ProgressTracker.DONE

View File

@ -2,10 +2,11 @@ package com.r3corda.demos.protocols
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.Strand
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.node.internal.Node import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache import com.r3corda.node.services.network.MockNetworkMapCache
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -19,10 +20,14 @@ object ExitServerProtocol {
data class ExitMessage(val exitCode: Int) data class ExitMessage(val exitCode: Int)
object Handler { class Plugin: CordaPluginRegistry() {
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
}
fun register(node: Node) { class Service(services: ServiceHubInternal) {
node.net.addMessageHandler("$TOPIC.0") { msg, registration ->
init {
services.networkService.addMessageHandler("$TOPIC.0") { msg, registration ->
// Just to validate we got the message // Just to validate we got the message
if (enabled) { if (enabled) {
val message = msg.data.deserialize<ExitMessage>() val message = msg.data.deserialize<ExitMessage>()

View File

@ -1,12 +1,14 @@
package com.r3corda.demos.protocols package com.r3corda.demos.protocols
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.ProgressTracker import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.demos.DemoClock import com.r3corda.demos.DemoClock
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache import com.r3corda.node.services.network.MockNetworkMapCache
import java.time.LocalDate import java.time.LocalDate
@ -19,12 +21,16 @@ object UpdateBusinessDayProtocol {
data class UpdateBusinessDayMessage(val date: LocalDate) data class UpdateBusinessDayMessage(val date: LocalDate)
object Handler { class Plugin: CordaPluginRegistry() {
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
}
fun register(node: Node) { class Service(services: ServiceHubInternal) {
node.net.addMessageHandler("${TOPIC}.0") { msg, registration ->
init {
services.networkService.addMessageHandler("${TOPIC}.0") { msg, registration ->
val updateBusinessDayMessage = msg.data.deserialize<UpdateBusinessDayMessage>() val updateBusinessDayMessage = msg.data.deserialize<UpdateBusinessDayMessage>()
(node.services.clock as DemoClock).updateDate(updateBusinessDayMessage.date) (services.clock as DemoClock).updateDate(updateBusinessDayMessage.date)
} }
} }
} }

View File

@ -1,2 +1,5 @@
# Register a ServiceLoader service extending from com.r3corda.node.CordaPluginRegistry # Register a ServiceLoader service extending from com.r3corda.node.CordaPluginRegistry
com.r3corda.demos.IRSDemoPluginRegistry com.r3corda.demos.IRSDemoPluginRegistry
com.r3corda.demos.protocols.AutoOfferProtocol$Plugin
com.r3corda.demos.protocols.ExitServerProtocol$Plugin
com.r3corda.demos.protocols.UpdateBusinessDayProtocol$Plugin