mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
Merge commit 'bc330bd9890d41904bbeea6e579f5fc4438872b6' into christians/merge-ENT-2414
This commit is contained in:
@ -36,8 +36,6 @@ dependencies {
|
||||
// TODO: Remove this dependency and the code that requires it
|
||||
compile "commons-fileupload:commons-fileupload:$fileupload_version"
|
||||
|
||||
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
|
||||
|
||||
// TypeSafe Config: for simple and human friendly config files.
|
||||
compile "com.typesafe:config:$typesafe_config_version"
|
||||
|
||||
|
@ -24,7 +24,7 @@ import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.Try
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper
|
||||
import org.apache.activemq.artemis.reader.MessageUtil
|
||||
@ -222,6 +222,11 @@ object RPCApi {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown if the RPC reply body couldn't be deserialized.
|
||||
*/
|
||||
class FailedToDeserializeReply(val id: InvocationId, cause: Throwable) : RuntimeException("Failed to deserialize RPC reply: ${cause.message}", cause)
|
||||
|
||||
companion object {
|
||||
private fun Any.safeSerialize(context: SerializationContext, wrap: (Throwable) -> Any) = try {
|
||||
serialize(context = context)
|
||||
@ -236,10 +241,18 @@ object RPCApi {
|
||||
RPCApi.ServerToClient.Tag.RPC_REPLY -> {
|
||||
val id = message.invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id)
|
||||
// The result here is a Try<> that represents the attempt to try the operation on the server side.
|
||||
// If anything goes wrong with deserialisation of the response, we propagate it differently because
|
||||
// we also need to pass through the invocation and dedupe IDs.
|
||||
val result: Try<Any?> = try {
|
||||
message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
|
||||
} catch (e: Exception) {
|
||||
throw FailedToDeserializeReply(id, e)
|
||||
}
|
||||
RpcReply(
|
||||
id = id,
|
||||
deduplicationIdentity = deduplicationIdentity,
|
||||
result = message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
|
||||
result = result
|
||||
)
|
||||
}
|
||||
RPCApi.ServerToClient.Tag.OBSERVATION -> {
|
||||
|
@ -12,6 +12,7 @@ package net.corda.nodeapi.internal
|
||||
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import net.corda.core.internal.buildNamed
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
@ -19,11 +20,11 @@ import java.util.concurrent.atomic.AtomicLong
|
||||
/**
|
||||
* A class allowing the deduplication of a strictly incrementing sequence number.
|
||||
*/
|
||||
class DeduplicationChecker(cacheExpiry: Duration) {
|
||||
class DeduplicationChecker(cacheExpiry: Duration, name: String = "DeduplicationChecker") {
|
||||
// dedupe identity -> watermark cache
|
||||
private val watermarkCache = Caffeine.newBuilder()
|
||||
.expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS)
|
||||
.build(WatermarkCacheLoader)
|
||||
.buildNamed("${name}_watermark", WatermarkCacheLoader)
|
||||
|
||||
private object WatermarkCacheLoader : CacheLoader<Any, AtomicLong> {
|
||||
override fun load(key: Any) = AtomicLong(-1)
|
||||
|
@ -0,0 +1,4 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
// TODO: Add to Corda node.conf to allow customisation
|
||||
const val NODE_INFO_DIRECTORY = "additional-node-infos"
|
@ -12,7 +12,6 @@ package net.corda.nodeapi.internal.network
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
@ -56,11 +55,11 @@ import kotlin.streams.toList
|
||||
*/
|
||||
// TODO Move this to tools:bootstrapper
|
||||
class NetworkBootstrapper
|
||||
@VisibleForTesting
|
||||
internal constructor(private val initSerEnv: Boolean,
|
||||
private val embeddedCordaJar: () -> InputStream,
|
||||
private val nodeInfosGenerator: (List<Path>) -> List<Path>,
|
||||
private val contractsJarConverter: (Path) -> ContractsJar) {
|
||||
@VisibleForTesting
|
||||
internal constructor(private val initSerEnv: Boolean,
|
||||
private val embeddedCordaJar: () -> InputStream,
|
||||
private val nodeInfosGenerator: (List<Path>) -> List<Path>,
|
||||
private val contractsJarConverter: (Path) -> ContractsJar) {
|
||||
|
||||
constructor() : this(
|
||||
initSerEnv = true,
|
||||
@ -112,7 +111,7 @@ class NetworkBootstrapper
|
||||
process.destroyForcibly()
|
||||
throw IllegalStateException("Error while generating node info file. Please check the logs in $logsDir.")
|
||||
}
|
||||
check(process.exitValue() == 0) { "Error while generating node info file. Please check the logs in $logsDir." }
|
||||
check(process.exitValue() == 0) { "Error while generating node info file. Please check the logs in $logsDir." }
|
||||
return nodeDir.list { paths ->
|
||||
paths.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get()
|
||||
}
|
||||
@ -279,7 +278,7 @@ class NetworkBootstrapper
|
||||
|
||||
private fun distributeNodeInfos(nodeDirs: List<Path>, nodeInfoFiles: List<Path>) {
|
||||
for (nodeDir in nodeDirs) {
|
||||
val additionalNodeInfosDir = (nodeDir / CordformNode.NODE_INFO_DIRECTORY).createDirectories()
|
||||
val additionalNodeInfosDir = (nodeDir / NODE_INFO_DIRECTORY).createDirectories()
|
||||
for (nodeInfoFile in nodeInfoFiles) {
|
||||
nodeInfoFile.copyToDirectory(additionalNodeInfosDir, REPLACE_EXISTING)
|
||||
}
|
||||
@ -374,10 +373,10 @@ class NetworkBootstrapper
|
||||
|
||||
private fun NodeInfo.notaryIdentity(): Party {
|
||||
return when (legalIdentities.size) {
|
||||
// Single node notaries have just one identity like all other nodes. This identity is the notary identity
|
||||
// Single node notaries have just one identity like all other nodes. This identity is the notary identity
|
||||
1 -> legalIdentities[0]
|
||||
// Nodes which are part of a distributed notary have a second identity which is the composite identity of the
|
||||
// cluster and is shared by all the other members. This is the notary identity.
|
||||
// Nodes which are part of a distributed notary have a second identity which is the composite identity of the
|
||||
// cluster and is shared by all the other members. This is the notary identity.
|
||||
2 -> legalIdentities[1]
|
||||
else -> throw IllegalArgumentException("Not sure how to get the notary identity in this scenerio: $this")
|
||||
}
|
||||
|
@ -10,9 +10,9 @@
|
||||
|
||||
package net.corda.nodeapi.internal.network
|
||||
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
|
||||
import rx.Observable
|
||||
import rx.Scheduler
|
||||
import rx.Subscription
|
||||
@ -106,10 +106,11 @@ class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()) : AutoCloseabl
|
||||
private fun poll() {
|
||||
nodeDataMapBox.locked {
|
||||
for (nodeData in values) {
|
||||
nodeData.nodeDir.list { paths -> paths
|
||||
.filter { it.isRegularFile() }
|
||||
.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }
|
||||
.forEach { processPath(nodeData, it) }
|
||||
nodeData.nodeDir.list { paths ->
|
||||
paths
|
||||
.filter { it.isRegularFile() }
|
||||
.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }
|
||||
.forEach { processPath(nodeData, it) }
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -159,7 +160,7 @@ class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()) : AutoCloseabl
|
||||
* Convenience holder for all the paths and files relative to a single node.
|
||||
*/
|
||||
private class NodeData(val nodeDir: Path) {
|
||||
val additionalNodeInfoDirectory: Path = nodeDir.resolve(CordformNode.NODE_INFO_DIRECTORY)
|
||||
val additionalNodeInfoDirectory: Path = nodeDir.resolve(NODE_INFO_DIRECTORY)
|
||||
// Map from Path to its lastModifiedTime.
|
||||
val previouslySeenFiles = mutableMapOf<Path, FileTime>()
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import net.corda.core.internal.buildNamed
|
||||
import net.corda.core.internal.castIfPossible
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
@ -72,7 +73,7 @@ class HibernateConfiguration(
|
||||
}
|
||||
}
|
||||
|
||||
private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).build<Set<MappedSchema>, SessionFactory>()
|
||||
private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).buildNamed<Set<MappedSchema>, SessionFactory>("HibernateConfiguration_sessionFactories")
|
||||
|
||||
val sessionFactoryForRegisteredSchemas = schemas.let {
|
||||
logger.info("Init HibernateConfiguration for schemas: $it")
|
||||
|
@ -1,7 +1,6 @@
|
||||
package net.corda.nodeapi.internal.network
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY
|
||||
import net.corda.core.crypto.secureRandomBytes
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -11,6 +10,7 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.nodeapi.internal.DEV_ROOT_CA
|
||||
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.config.parseAs
|
||||
import net.corda.nodeapi.internal.config.toConfig
|
||||
|
@ -10,11 +10,11 @@
|
||||
|
||||
package net.corda.nodeapi.internal.network
|
||||
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.list
|
||||
import net.corda.core.internal.write
|
||||
import net.corda.nodeapi.eventually
|
||||
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
@ -44,12 +44,12 @@ class NodeInfoFilesCopierTest {
|
||||
private val rootPath get() = folder.root.toPath()
|
||||
private val scheduler = TestScheduler()
|
||||
|
||||
private fun nodeDir(nodeBaseDir : String) = rootPath.resolve(nodeBaseDir).resolve(ORGANIZATION.toLowerCase())
|
||||
private fun nodeDir(nodeBaseDir: String) = rootPath.resolve(nodeBaseDir).resolve(ORGANIZATION.toLowerCase())
|
||||
|
||||
private val node1RootPath by lazy { nodeDir(NODE_1_PATH) }
|
||||
private val node2RootPath by lazy { nodeDir(NODE_2_PATH) }
|
||||
private val node1AdditionalNodeInfoPath by lazy { node1RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
|
||||
private val node2AdditionalNodeInfoPath by lazy { node2RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
|
||||
private val node1AdditionalNodeInfoPath by lazy { node1RootPath.resolve(NODE_INFO_DIRECTORY) }
|
||||
private val node2AdditionalNodeInfoPath by lazy { node2RootPath.resolve(NODE_INFO_DIRECTORY) }
|
||||
|
||||
private lateinit var nodeInfoFilesCopier: NodeInfoFilesCopier
|
||||
|
||||
|
Reference in New Issue
Block a user