mirror of
https://github.com/corda/corda.git
synced 2025-02-21 01:42:24 +00:00
Make the IRS Demo web api an api plugin (scanned from the Node classpath) and use the same permission checking entry point for web api's as the scheduler.
This commit is contained in:
parent
4356cef1cd
commit
99fe3dfe75
@ -0,0 +1,26 @@
|
||||
package com.r3corda.core.node
|
||||
|
||||
/**
|
||||
* Implement this interface on a class advertised in a META-INF/services/com.r3corda.core.node.CordaPluginRegistry file
|
||||
* to extend a Corda node with additional application services.
|
||||
*/
|
||||
interface CordaPluginRegistry {
|
||||
/**
|
||||
* List of JAX-RS classes inside the contract jar. They are expected to have a single parameter constructor that takes a ServiceHub as input.
|
||||
* These are listed as Class<*>, because they will be instantiated inside an AttachmentClassLoader so that subsequent protocols, contracts, etc
|
||||
* will be running in the appropriate isolated context.
|
||||
*/
|
||||
val webApis: List<Class<*>>
|
||||
|
||||
/**
|
||||
* Set of top level protocol class names that will be initiated by the plugin.
|
||||
* This is used to extend the white listed protocols that can be initiated from the ServiceHub invokeProtocolAsync method
|
||||
*/
|
||||
val protocolLogicClassNameWhitelist: Set<String>
|
||||
|
||||
/**
|
||||
* Set of associated constructor parameters that will be passed into the protocols.
|
||||
* This is used to control what can be passed to protocols initiated from the ServiceHub invokeProtocolAsync method
|
||||
*/
|
||||
val protocolArgsClassNameWhitelist: Set<String>
|
||||
}
|
@ -1,8 +1,10 @@
|
||||
package com.r3corda.core.node
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.services.*
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import java.time.Clock
|
||||
|
||||
/**
|
||||
@ -61,4 +63,21 @@ interface ServiceHub {
|
||||
val definingTx = storageService.validatedTransactions.getTransaction(stateRef.txhash) ?: throw TransactionResolutionException(stateRef.txhash)
|
||||
return definingTx.tx.outputs[stateRef.index]
|
||||
}
|
||||
|
||||
/**
|
||||
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the protocol.
|
||||
*
|
||||
* @throws IllegalProtocolLogicException or IllegalArgumentException if there are problems with the [logicType] or [args]
|
||||
*/
|
||||
fun <T: Any>invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ListenableFuture<T>
|
||||
|
||||
/**
|
||||
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the protocol.
|
||||
*
|
||||
* Will block and return any protocol result when the protocol eventually completes.
|
||||
*/
|
||||
fun <T: Any>invokeProtocolSync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): T {
|
||||
return invokeProtocolAsync(logicType, *args).get()
|
||||
}
|
||||
|
||||
}
|
@ -5,10 +5,12 @@ import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.RunOnCallerThread
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.contracts.StateRef
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.runOnNextMessage
|
||||
import com.r3corda.core.node.CityDatabase
|
||||
import com.r3corda.core.node.CordaPluginRegistry
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.PhysicalLocation
|
||||
import com.r3corda.core.node.services.*
|
||||
@ -46,12 +48,14 @@ import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.ANSIProgressObserver
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.protocols.TwoPartyDealProtocol
|
||||
import org.slf4j.Logger
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
@ -97,7 +101,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
|
||||
// Internal only
|
||||
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
|
||||
override val protocolLogicRefFactory = ProtocolLogicRefFactory()
|
||||
override val protocolLogicRefFactory:ProtocolLogicRefFactory get() = protocolLogicFactory
|
||||
|
||||
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
|
||||
return smm.add(loggerName, logic)
|
||||
@ -124,6 +128,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
lateinit var net: MessagingService
|
||||
lateinit var api: APIServer
|
||||
lateinit var scheduler: SchedulerService
|
||||
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
|
||||
var isPreviousCheckpointsPresent = false
|
||||
private set
|
||||
|
||||
@ -158,6 +163,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
checkpointStorage,
|
||||
serverThread)
|
||||
|
||||
protocolLogicFactory = initialiseProtocolLogicFactory()
|
||||
|
||||
// This object doesn't need to be referenced from this class because it registers handlers on the network
|
||||
// service and so that keeps it from being collected.
|
||||
DataVendingService(net, storage, services.networkMapCache)
|
||||
@ -180,6 +187,19 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
return this
|
||||
}
|
||||
|
||||
private fun initialiseProtocolLogicFactory(): ProtocolLogicRefFactory {
|
||||
val serviceLoader = ServiceLoader.load(CordaPluginRegistry::class.java)
|
||||
val pluginRegistries = serviceLoader.toList()
|
||||
val protocolLogicClassNameWhitelist = pluginRegistries.flatMap { x -> x.protocolLogicClassNameWhitelist }.toMutableSet()
|
||||
val protocolArgsClassNameWhitelist = pluginRegistries.flatMap { x -> x.protocolArgsClassNameWhitelist }.toMutableSet()
|
||||
|
||||
//Add in standard protocol whitelist
|
||||
protocolLogicClassNameWhitelist.add(TwoPartyDealProtocol.FixingRoleDecider::class.java.name)
|
||||
protocolArgsClassNameWhitelist.add(StateRef::class.java.name)
|
||||
protocolArgsClassNameWhitelist.add(Duration::class.java.name)
|
||||
return ProtocolLogicRefFactory(protocolLogicClassNameWhitelist, protocolArgsClassNameWhitelist)
|
||||
}
|
||||
|
||||
/**
|
||||
* Run any tasks that are needed to ensure the node is in a correct state before running start()
|
||||
*/
|
||||
|
@ -3,10 +3,11 @@ package com.r3corda.node.internal
|
||||
import com.codahale.metrics.JmxReporter
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.CordaPluginRegistry
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.api.APIServer
|
||||
import com.r3corda.node.serialization.NodeClock
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingService
|
||||
@ -27,9 +28,9 @@ import java.io.RandomAccessFile
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.channels.FileLock
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.time.Clock
|
||||
import java.util.*
|
||||
import javax.management.ObjectName
|
||||
|
||||
class ConfigurationException(message: String) : Exception(message)
|
||||
@ -55,8 +56,7 @@ class ConfigurationException(message: String) : Exception(message)
|
||||
*/
|
||||
class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, configuration: NodeConfiguration,
|
||||
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>,
|
||||
clock: Clock = NodeClock(),
|
||||
val clientAPIs: List<Class<*>> = listOf()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) {
|
||||
clock: Clock = NodeClock()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) {
|
||||
companion object {
|
||||
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */
|
||||
val DEFAULT_PORT = 31337
|
||||
@ -109,12 +109,15 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
resourceConfig.register(ResponseFilter())
|
||||
resourceConfig.register(api)
|
||||
|
||||
for(customAPIClass in clientAPIs) {
|
||||
val customAPI = customAPIClass.getConstructor(APIServer::class.java).newInstance(api)
|
||||
val serviceLoader = ServiceLoader.load(CordaPluginRegistry::class.java)
|
||||
val pluginRegistries = serviceLoader.toList()
|
||||
val webAPIsOnClasspath = pluginRegistries.flatMap { x -> x.webApis }
|
||||
for (webapi in webAPIsOnClasspath) {
|
||||
log.info("Add Plugin web API from attachment ${webapi.name}")
|
||||
val customAPI = webapi.getConstructor(ServiceHub::class.java).newInstance(services)
|
||||
resourceConfig.register(customAPI)
|
||||
}
|
||||
|
||||
|
||||
// Give the app a slightly better name in JMX rather than a randomly generated one and enable JMX
|
||||
resourceConfig.addProperties(mapOf(ServerProperties.APPLICATION_NAME to "node.api",
|
||||
ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED to "true"))
|
||||
@ -187,5 +190,5 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
val ourProcessID: String = ManagementFactory.getRuntimeMXBean().name.split("@")[0]
|
||||
f.setLength(0)
|
||||
f.write(ourProcessID.toByteArray())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,4 +29,11 @@ abstract class ServiceHubInternal : ServiceHub {
|
||||
* itself, at which point this method would not be needed (by the scheduler)
|
||||
*/
|
||||
abstract fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T>
|
||||
|
||||
override fun <T: Any>invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ListenableFuture<T> {
|
||||
val logicRef = protocolLogicRefFactory.create(logicType, *args)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val logic = protocolLogicRefFactory.toProtocolLogic(logicRef) as ProtocolLogic<T>
|
||||
return startProtocol(logicType.simpleName, logic)
|
||||
}
|
||||
}
|
@ -1,9 +1,11 @@
|
||||
package com.r3corda.demos
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.contracts.InterestRateSwap
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.logElapsedTime
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.CordaPluginRegistry
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
@ -241,6 +243,16 @@ object CliParamsSpec {
|
||||
val nonOptions = parser.nonOptions()
|
||||
}
|
||||
|
||||
class IRSDemoPluginRegistry : CordaPluginRegistry {
|
||||
override val webApis: List<Class<*>> = listOf(InterestRateSwapAPI::class.java)
|
||||
override val protocolLogicClassNameWhitelist: Set<String> = setOf(AutoOfferProtocol.Requester::class.java.name,
|
||||
UpdateBusinessDayProtocol.Broadcast::class.java.name,
|
||||
ExitServerProtocol.Broadcast::class.java.name)
|
||||
override val protocolArgsClassNameWhitelist: Set<String> = setOf(InterestRateSwap.State::class.java.name,
|
||||
java.time.LocalDate::class.java.name,
|
||||
kotlin.Int::class.java.name)
|
||||
}
|
||||
|
||||
private class NotSetupException: Throwable {
|
||||
constructor(message: String): super(message) {}
|
||||
}
|
||||
@ -374,8 +386,7 @@ private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipi
|
||||
}
|
||||
|
||||
val node = logElapsedTime("Node startup") {
|
||||
Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock(),
|
||||
listOf(InterestRateSwapAPI::class.java)).start()
|
||||
Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).start()
|
||||
}
|
||||
|
||||
// TODO: This should all be replaced by the identity service being updated
|
||||
|
@ -76,8 +76,7 @@ fun main(args: Array<String>) {
|
||||
val apiAddr = HostAndPort.fromParts(myNetAddr.hostText, myNetAddr.port + 1)
|
||||
|
||||
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, apiAddr, config, networkMapAddress,
|
||||
advertisedServices, DemoClock(),
|
||||
listOf(InterestRateSwapAPI::class.java)).setup().start() }
|
||||
advertisedServices, DemoClock()).setup().start() }
|
||||
|
||||
val notary = node.services.networkMapCache.notaryNodes[0]
|
||||
|
||||
|
@ -1,15 +1,16 @@
|
||||
package com.r3corda.demos.api
|
||||
|
||||
import com.r3corda.contracts.InterestRateSwap
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.linearHeadsOfType
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.demos.protocols.AutoOfferProtocol
|
||||
import com.r3corda.demos.protocols.ExitServerProtocol
|
||||
import com.r3corda.demos.protocols.UpdateBusinessDayProtocol
|
||||
import com.r3corda.node.api.APIServer
|
||||
import com.r3corda.node.api.ProtocolClassRef
|
||||
import com.r3corda.node.api.StatesQuery
|
||||
import java.net.URI
|
||||
import java.time.LocalDate
|
||||
import java.time.LocalDateTime
|
||||
import javax.ws.rs.*
|
||||
import javax.ws.rs.core.MediaType
|
||||
import javax.ws.rs.core.Response
|
||||
@ -35,23 +36,23 @@ import javax.ws.rs.core.Response
|
||||
* or if the demodate or population of deals should be reset (will only work while persistence is disabled).
|
||||
*/
|
||||
@Path("irs")
|
||||
class InterestRateSwapAPI(val api: APIServer) {
|
||||
class InterestRateSwapAPI(val services: ServiceHub) {
|
||||
|
||||
private val logger = loggerFor<InterestRateSwapAPI>()
|
||||
|
||||
private fun generateDealLink(deal: InterestRateSwap.State) = "/api/irs/deals/" + deal.common.tradeID
|
||||
|
||||
private fun getDealByRef(ref: String): InterestRateSwap.State? {
|
||||
val states = api.queryStates(StatesQuery.selectDeal(ref))
|
||||
return if (states.isEmpty()) null else {
|
||||
val deals = api.fetchStates(states).values.map { it?.data as InterestRateSwap.State }.filterNotNull()
|
||||
val states = services.walletService.linearHeadsOfType<InterestRateSwap.State>().filterValues { it.state.data.ref == ref }
|
||||
return if (states.isEmpty()) null else {
|
||||
val deals = states.values.map { it.state.data }
|
||||
return if (deals.isEmpty()) null else deals[0]
|
||||
}
|
||||
}
|
||||
|
||||
private fun getAllDeals(): Array<InterestRateSwap.State> {
|
||||
val states = api.queryStates(StatesQuery.selectAllDeals())
|
||||
val swaps = api.fetchStates(states).values.map { it?.data as InterestRateSwap.State }.filterNotNull().toTypedArray()
|
||||
val states = services.walletService.linearHeadsOfType<InterestRateSwap.State>()
|
||||
val swaps = states.values.map { it.state.data }.toTypedArray()
|
||||
return swaps
|
||||
}
|
||||
|
||||
@ -64,7 +65,7 @@ class InterestRateSwapAPI(val api: APIServer) {
|
||||
@Path("deals")
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
fun storeDeal(newDeal: InterestRateSwap.State): Response {
|
||||
api.invokeProtocolSync(ProtocolClassRef(AutoOfferProtocol.Requester::class.java.name!!), mapOf("dealToBeOffered" to newDeal))
|
||||
services.invokeProtocolSync<SignedTransaction>(AutoOfferProtocol.Requester::class.java,newDeal)
|
||||
return Response.created(URI.create(generateDealLink(newDeal))).build()
|
||||
}
|
||||
|
||||
@ -84,10 +85,10 @@ class InterestRateSwapAPI(val api: APIServer) {
|
||||
@Path("demodate")
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
fun storeDemoDate(newDemoDate: LocalDate): Response {
|
||||
val priorDemoDate = api.serverTime().toLocalDate()
|
||||
val priorDemoDate = fetchDemoDate()
|
||||
// Can only move date forwards
|
||||
if (newDemoDate.isAfter(priorDemoDate)) {
|
||||
api.invokeProtocolSync(ProtocolClassRef(UpdateBusinessDayProtocol.Broadcast::class.java.name!!), mapOf("date" to newDemoDate))
|
||||
services.invokeProtocolSync<Unit>(UpdateBusinessDayProtocol.Broadcast::class.java,newDemoDate)
|
||||
return Response.ok().build()
|
||||
}
|
||||
val msg = "demodate is already $priorDemoDate and can only be updated with a later date"
|
||||
@ -99,14 +100,14 @@ class InterestRateSwapAPI(val api: APIServer) {
|
||||
@Path("demodate")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
fun fetchDemoDate(): LocalDate {
|
||||
return api.serverTime().toLocalDate()
|
||||
return LocalDateTime.now(services.clock).toLocalDate()
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path("restart")
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
fun exitServer(): Response {
|
||||
api.invokeProtocolSync(ProtocolClassRef(ExitServerProtocol.Broadcast::class.java.name!!), mapOf("exitCode" to 83))
|
||||
services.invokeProtocolSync<Boolean>(ExitServerProtocol.Broadcast::class.java,83)
|
||||
return Response.ok().build()
|
||||
}
|
||||
}
|
||||
|
@ -37,20 +37,19 @@ object ExitServerProtocol {
|
||||
* This takes a Java Integer rather than Kotlin Int as that is what we end up with in the calling map and currently
|
||||
* we do not support coercing numeric types in the reflective search for matching constructors
|
||||
*/
|
||||
class Broadcast(@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") val exitCode: Integer) : ProtocolLogic<Boolean>() {
|
||||
class Broadcast(@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") val exitCode: Int) : ProtocolLogic<Boolean>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Boolean {
|
||||
if (enabled) {
|
||||
val rc = exitCode.toInt()
|
||||
val message = ExitMessage(rc)
|
||||
val message = ExitMessage(exitCode)
|
||||
|
||||
for (recipient in serviceHub.networkMapCache.partyNodes) {
|
||||
doNextRecipient(recipient, message)
|
||||
}
|
||||
// Sleep a little in case any async message delivery to other nodes needs to happen
|
||||
Strand.sleep(1, TimeUnit.SECONDS)
|
||||
System.exit(rc)
|
||||
System.exit(exitCode)
|
||||
}
|
||||
return enabled
|
||||
}
|
||||
|
@ -0,0 +1,2 @@
|
||||
# Register a ServiceLoader service extending from com.r3corda.node.CordaPluginRegistry
|
||||
com.r3corda.demos.IRSDemoPluginRegistry
|
Loading…
x
Reference in New Issue
Block a user