mirror of
https://github.com/corda/corda.git
synced 2024-12-19 21:17:58 +00:00
[CORDA-1879]: Ensure Node dies on unrecoverable errors. (#4213)
This commit is contained in:
parent
ac23fcdf24
commit
dc62b20c5d
@ -204,13 +204,13 @@ class NodeMonitorModel : AutoCloseable {
|
|||||||
val nodeInfo = _connection.proxy.nodeInfo()
|
val nodeInfo = _connection.proxy.nodeInfo()
|
||||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||||
_connection
|
_connection
|
||||||
} catch (throwable: Throwable) {
|
} catch (exception: Exception) {
|
||||||
if (shouldRetry) {
|
if (shouldRetry) {
|
||||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||||
logger.info("Exception upon establishing connection: {}", throwable.message)
|
logger.info("Exception upon establishing connection: {}", exception.message)
|
||||||
null
|
null
|
||||||
} else {
|
} else {
|
||||||
throw throwable
|
throw exception
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance")) {
|
|||||||
nodeIsShut.onCompleted()
|
nodeIsShut.onCompleted()
|
||||||
} catch (e: ActiveMQSecurityException) {
|
} catch (e: ActiveMQSecurityException) {
|
||||||
// nothing here - this happens if trying to connect before the node is started
|
// nothing here - this happens if trying to connect before the node is started
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
nodeIsShut.onError(e)
|
nodeIsShut.onError(e)
|
||||||
}
|
}
|
||||||
}, 1, 1, TimeUnit.SECONDS)
|
}, 1, 1, TimeUnit.SECONDS)
|
||||||
|
@ -564,8 +564,8 @@ class RPCClientProxyHandler(
|
|||||||
observationExecutorPool.run(k) {
|
observationExecutorPool.run(k) {
|
||||||
try {
|
try {
|
||||||
m[k]?.onError(ConnectionFailureException())
|
m[k]?.onError(ConnectionFailureException())
|
||||||
} catch (th: Throwable) {
|
} catch (e: Exception) {
|
||||||
log.error("Unexpected exception when RPC connection failure handling", th)
|
log.error("Unexpected exception when RPC connection failure handling", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import net.corda.core.KeepForDJVM
|
|||||||
import net.corda.core.internal.extractFile
|
import net.corda.core.internal.extractFile
|
||||||
import net.corda.core.serialization.CordaSerializable
|
import net.corda.core.serialization.CordaSerializable
|
||||||
import java.io.FileNotFoundException
|
import java.io.FileNotFoundException
|
||||||
|
import java.io.IOException
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
import java.io.OutputStream
|
import java.io.OutputStream
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
@ -36,10 +37,10 @@ interface Attachment : NamedByHash {
|
|||||||
@JvmDefault
|
@JvmDefault
|
||||||
fun openAsJAR(): JarInputStream {
|
fun openAsJAR(): JarInputStream {
|
||||||
val stream = open()
|
val stream = open()
|
||||||
try {
|
return try {
|
||||||
return JarInputStream(stream)
|
JarInputStream(stream)
|
||||||
} catch (t: Throwable) {
|
} catch (e: IOException) {
|
||||||
stream.use { throw t }
|
stream.use { throw e }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,8 +71,8 @@ fun <V, W> CordaFuture<out V>.flatMap(transform: (V) -> CordaFuture<out W>): Cor
|
|||||||
thenMatch(success@ {
|
thenMatch(success@ {
|
||||||
result.captureLater(try {
|
result.captureLater(try {
|
||||||
transform(it)
|
transform(it)
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
result.setException(t)
|
result.setException(e)
|
||||||
return@success
|
return@success
|
||||||
})
|
})
|
||||||
}, {
|
}, {
|
||||||
@ -128,8 +128,8 @@ interface ValueOrException<in V> {
|
|||||||
fun capture(block: () -> V): Boolean {
|
fun capture(block: () -> V): Boolean {
|
||||||
return set(try {
|
return set(try {
|
||||||
block()
|
block()
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
return setException(t)
|
return setException(e)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -153,8 +153,8 @@ internal class CordaFutureImpl<V>(private val impl: CompletableFuture<V> = Compl
|
|||||||
impl.whenComplete { _, _ ->
|
impl.whenComplete { _, _ ->
|
||||||
try {
|
try {
|
||||||
callback(this)
|
callback(this)
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
log.error(listenerFailedMessage, t)
|
log.error(listenerFailedMessage, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -193,7 +193,7 @@ data class LedgerTransaction @JvmOverloads constructor(
|
|||||||
is Try.Success -> {
|
is Try.Success -> {
|
||||||
try {
|
try {
|
||||||
contractInstances.add(result.value.newInstance())
|
contractInstances.add(result.value.newInstance())
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
throw TransactionVerificationException.ContractCreationError(id, result.value.name, e)
|
throw TransactionVerificationException.ContractCreationError(id, result.value.name, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -202,7 +202,7 @@ data class LedgerTransaction @JvmOverloads constructor(
|
|||||||
contractInstances.forEach { contract ->
|
contractInstances.forEach { contract ->
|
||||||
try {
|
try {
|
||||||
contract.verify(this)
|
contract.verify(this)
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
throw TransactionVerificationException.ContractRejection(id, contract, e)
|
throw TransactionVerificationException.ContractRejection(id, contract, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,8 +19,8 @@ sealed class Try<out A> {
|
|||||||
inline fun <T> on(body: () -> T): Try<T> {
|
inline fun <T> on(body: () -> T): Try<T> {
|
||||||
return try {
|
return try {
|
||||||
Success(body())
|
Success(body())
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
Failure(t)
|
Failure(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,31 +0,0 @@
|
|||||||
ext {
|
|
||||||
javaassist_version = "3.12.1.GA"
|
|
||||||
}
|
|
||||||
|
|
||||||
apply plugin: 'kotlin'
|
|
||||||
apply plugin: 'idea'
|
|
||||||
|
|
||||||
description 'A javaagent to allow hooking into Kryo'
|
|
||||||
|
|
||||||
dependencies {
|
|
||||||
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
|
||||||
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
|
|
||||||
compile "javassist:javassist:$javaassist_version"
|
|
||||||
compile "com.esotericsoftware:kryo:4.0.0"
|
|
||||||
compile "$quasar_group:quasar-core:$quasar_version:jdk8"
|
|
||||||
}
|
|
||||||
|
|
||||||
jar {
|
|
||||||
archiveName = "${project.name}.jar"
|
|
||||||
manifest {
|
|
||||||
attributes(
|
|
||||||
'Premain-Class': 'net.corda.kryohook.KryoHookAgent',
|
|
||||||
'Can-Redefine-Classes': 'true',
|
|
||||||
'Can-Retransform-Classes': 'true',
|
|
||||||
'Can-Set-Native-Method-Prefix': 'true',
|
|
||||||
'Implementation-Title': "KryoHook",
|
|
||||||
'Implementation-Version': rootProject.version
|
|
||||||
)
|
|
||||||
}
|
|
||||||
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
|
|
||||||
}
|
|
@ -1,164 +0,0 @@
|
|||||||
package net.corda.kryohook
|
|
||||||
|
|
||||||
import co.paralleluniverse.strands.Strand
|
|
||||||
import com.esotericsoftware.kryo.Kryo
|
|
||||||
import com.esotericsoftware.kryo.io.Output
|
|
||||||
import javassist.ClassPool
|
|
||||||
import javassist.CtClass
|
|
||||||
import java.lang.instrument.ClassFileTransformer
|
|
||||||
import java.lang.instrument.Instrumentation
|
|
||||||
import java.security.ProtectionDomain
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
|
|
||||||
class KryoHookAgent {
|
|
||||||
companion object {
|
|
||||||
@JvmStatic
|
|
||||||
fun premain(@Suppress("UNUSED_PARAMETER") argumentsString: String?, instrumentation: Instrumentation) {
|
|
||||||
Runtime.getRuntime().addShutdownHook(Thread {
|
|
||||||
val statsTrees = KryoHook.events.values.flatMap {
|
|
||||||
readTrees(it, 0).second
|
|
||||||
}
|
|
||||||
val builder = StringBuilder()
|
|
||||||
statsTrees.forEach {
|
|
||||||
prettyStatsTree(0, it, builder)
|
|
||||||
}
|
|
||||||
print(builder.toString())
|
|
||||||
})
|
|
||||||
instrumentation.addTransformer(KryoHook)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun prettyStatsTree(indent: Int, statsTree: StatsTree, builder: StringBuilder) {
|
|
||||||
when (statsTree) {
|
|
||||||
is StatsTree.Object -> {
|
|
||||||
builder.append(kotlin.CharArray(indent) { ' ' })
|
|
||||||
builder.append(statsTree.className)
|
|
||||||
builder.append(" ")
|
|
||||||
builder.append(statsTree.size)
|
|
||||||
builder.append("\n")
|
|
||||||
for (child in statsTree.children) {
|
|
||||||
prettyStatsTree(indent + 2, child, builder)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The hook simply records the write() entries and exits together with the output offset at the time of the call.
|
|
||||||
* This is recorded in a StrandID -> List<StatsEvent> map.
|
|
||||||
*
|
|
||||||
* Later we "parse" these lists into a tree.
|
|
||||||
*/
|
|
||||||
object KryoHook : ClassFileTransformer {
|
|
||||||
val classPool = ClassPool.getDefault()!!
|
|
||||||
|
|
||||||
val hookClassName = javaClass.name!!
|
|
||||||
|
|
||||||
override fun transform(
|
|
||||||
loader: ClassLoader?,
|
|
||||||
className: String,
|
|
||||||
classBeingRedefined: Class<*>?,
|
|
||||||
protectionDomain: ProtectionDomain?,
|
|
||||||
classfileBuffer: ByteArray
|
|
||||||
): ByteArray? {
|
|
||||||
if (className.startsWith("java") || className.startsWith("javassist") || className.startsWith("kotlin")) {
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
return try {
|
|
||||||
val clazz = classPool.makeClass(classfileBuffer.inputStream())
|
|
||||||
instrumentClass(clazz)?.toBytecode()
|
|
||||||
} catch (throwable: Throwable) {
|
|
||||||
println("SOMETHING WENT WRONG")
|
|
||||||
throwable.printStackTrace(System.out)
|
|
||||||
null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun instrumentClass(clazz: CtClass): CtClass? {
|
|
||||||
for (method in clazz.declaredBehaviors) {
|
|
||||||
if (method.name == "write") {
|
|
||||||
val parameterTypeNames = method.parameterTypes.map { it.name }
|
|
||||||
if (parameterTypeNames == listOf("com.esotericsoftware.kryo.Kryo", "com.esotericsoftware.kryo.io.Output", "java.lang.Object")) {
|
|
||||||
if (method.isEmpty) continue
|
|
||||||
println("Instrumenting ${clazz.name}")
|
|
||||||
method.insertBefore("$hookClassName.${this::writeEnter.name}($1, $2, $3);")
|
|
||||||
method.insertAfter("$hookClassName.${this::writeExit.name}($1, $2, $3);")
|
|
||||||
return clazz
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
// StrandID -> StatsEvent map
|
|
||||||
val events = ConcurrentHashMap<Long, ArrayList<StatsEvent>>()
|
|
||||||
|
|
||||||
@JvmStatic
|
|
||||||
fun writeEnter(@Suppress("UNUSED_PARAMETER") kryo: Kryo, output: Output, obj: Any) {
|
|
||||||
events.getOrPut(Strand.currentStrand().id) { ArrayList() }.add(
|
|
||||||
StatsEvent.Enter(obj.javaClass.name, output.total())
|
|
||||||
)
|
|
||||||
}
|
|
||||||
@JvmStatic
|
|
||||||
fun writeExit(@Suppress("UNUSED_PARAMETER") kryo: Kryo, output: Output, obj: Any) {
|
|
||||||
events[Strand.currentStrand().id]!!.add(
|
|
||||||
StatsEvent.Exit(obj.javaClass.name, output.total())
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* TODO we could add events on entries/exits to field serializers to get more info on what's being serialised.
|
|
||||||
*/
|
|
||||||
sealed class StatsEvent {
|
|
||||||
data class Enter(val className: String, val offset: Long) : StatsEvent()
|
|
||||||
data class Exit(val className: String, val offset: Long) : StatsEvent()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* TODO add Field constructor.
|
|
||||||
*/
|
|
||||||
sealed class StatsTree {
|
|
||||||
data class Object(
|
|
||||||
val className: String,
|
|
||||||
val size: Long,
|
|
||||||
val children: List<StatsTree>
|
|
||||||
) : StatsTree()
|
|
||||||
}
|
|
||||||
|
|
||||||
fun readTree(events: List<StatsEvent>, index: Int): Pair<Int, StatsTree> {
|
|
||||||
val event = events[index]
|
|
||||||
when (event) {
|
|
||||||
is StatsEvent.Enter -> {
|
|
||||||
val (nextIndex, children) = readTrees(events, index + 1)
|
|
||||||
val exit = events[nextIndex] as StatsEvent.Exit
|
|
||||||
require(event.className == exit.className)
|
|
||||||
return Pair(nextIndex + 1, StatsTree.Object(event.className, exit.offset - event.offset, children))
|
|
||||||
}
|
|
||||||
is StatsEvent.Exit -> {
|
|
||||||
throw IllegalStateException("Wasn't expecting Exit")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun readTrees(events: List<StatsEvent>, index: Int): Pair<Int, List<StatsTree>> {
|
|
||||||
val trees = ArrayList<StatsTree>()
|
|
||||||
var i = index
|
|
||||||
while (true) {
|
|
||||||
val event = events.getOrNull(i)
|
|
||||||
when (event) {
|
|
||||||
is StatsEvent.Enter -> {
|
|
||||||
val (nextIndex, tree) = readTree(events, i)
|
|
||||||
trees.add(tree)
|
|
||||||
i = nextIndex
|
|
||||||
}
|
|
||||||
is StatsEvent.Exit -> {
|
|
||||||
return Pair(i, trees)
|
|
||||||
}
|
|
||||||
null -> {
|
|
||||||
return Pair(i, trees)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,23 +0,0 @@
|
|||||||
What is this
|
|
||||||
------------
|
|
||||||
|
|
||||||
This is a javaagent that hooks into Kryo serializers to record a breakdown of how many bytes objects take in the output.
|
|
||||||
|
|
||||||
The dump is quite ugly now, but the in-memory representation is a simple tree so we could put some nice visualisation on
|
|
||||||
top if we want.
|
|
||||||
|
|
||||||
How do I run it
|
|
||||||
---------------
|
|
||||||
|
|
||||||
Build the agent:
|
|
||||||
```
|
|
||||||
./gradlew experimental:kryo-hook:jar
|
|
||||||
```
|
|
||||||
|
|
||||||
Add this JVM flag to what you're running:
|
|
||||||
|
|
||||||
```
|
|
||||||
-javaagent:<PROJECT>/experimental/kryo-hook/build/libs/kryo-hook.jar
|
|
||||||
```
|
|
||||||
|
|
||||||
The agent will dump the output when the JVM shuts down.
|
|
@ -191,6 +191,7 @@ object QuasarInstrumentationHook : ClassFileTransformer {
|
|||||||
} catch (throwable: Throwable) {
|
} catch (throwable: Throwable) {
|
||||||
println("SOMETHING WENT WRONG")
|
println("SOMETHING WENT WRONG")
|
||||||
throwable.printStackTrace(System.out)
|
throwable.printStackTrace(System.out)
|
||||||
|
if (throwable is VirtualMachineError) throw throwable
|
||||||
classfileBuffer
|
classfileBuffer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -220,8 +220,8 @@ object RPCApi {
|
|||||||
companion object {
|
companion object {
|
||||||
private fun Any.safeSerialize(context: SerializationContext, wrap: (Throwable) -> Any) = try {
|
private fun Any.safeSerialize(context: SerializationContext, wrap: (Throwable) -> Any) = try {
|
||||||
serialize(context = context)
|
serialize(context = context)
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
wrap(t).serialize(context = context)
|
wrap(e).serialize(context = context)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun fromClientMessage(context: SerializationContext, message: ClientMessage): ServerToClient {
|
fun fromClientMessage(context: SerializationContext, message: ClientMessage): ServerToClient {
|
||||||
|
@ -160,8 +160,8 @@ class CordaPersistence(
|
|||||||
var recoverableFailureCount = 0
|
var recoverableFailureCount = 0
|
||||||
fun <T> quietly(task: () -> T) = try {
|
fun <T> quietly(task: () -> T) = try {
|
||||||
task()
|
task()
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
log.warn("Cleanup task failed:", t)
|
log.warn("Cleanup task failed:", e)
|
||||||
}
|
}
|
||||||
while (true) {
|
while (true) {
|
||||||
val transaction = contextDatabase.currentOrNew(isolationLevel) // XXX: Does this code really support statement changing the contextDatabase?
|
val transaction = contextDatabase.currentOrNew(isolationLevel) // XXX: Does this code really support statement changing the contextDatabase?
|
||||||
@ -169,7 +169,7 @@ class CordaPersistence(
|
|||||||
val answer = transaction.statement()
|
val answer = transaction.statement()
|
||||||
transaction.commit()
|
transaction.commit()
|
||||||
return answer
|
return answer
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
quietly(transaction::rollback)
|
quietly(transaction::rollback)
|
||||||
if (e is SQLException || (recoverAnyNestedSQLException && e.hasSQLExceptionCause())) {
|
if (e is SQLException || (recoverAnyNestedSQLException && e.hasSQLExceptionCause())) {
|
||||||
if (++recoverableFailureCount > recoverableFailureTolerance) throw e
|
if (++recoverableFailureCount > recoverableFailureTolerance) throw e
|
||||||
|
@ -318,7 +318,7 @@ class X509UtilitiesTest {
|
|||||||
lock.notifyAll()
|
lock.notifyAll()
|
||||||
}
|
}
|
||||||
sslServerSocket.close()
|
sslServerSocket.close()
|
||||||
} catch (ex: Throwable) {
|
} catch (ex: Exception) {
|
||||||
serverError = true
|
serverError = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -493,8 +493,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
|||||||
val republishInterval = try {
|
val republishInterval = try {
|
||||||
networkMapClient.publish(signedNodeInfo)
|
networkMapClient.publish(signedNodeInfo)
|
||||||
heartbeatInterval
|
heartbeatInterval
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
log.warn("Error encountered while publishing node info, will retry again", t)
|
log.warn("Error encountered while publishing node info, will retry again", e)
|
||||||
// TODO: Exponential backoff? It should reach max interval of eventHorizon/2.
|
// TODO: Exponential backoff? It should reach max interval of eventHorizon/2.
|
||||||
1.minutes
|
1.minutes
|
||||||
}
|
}
|
||||||
|
@ -333,7 +333,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
log.info("Retrieved public IP from Network Map Service: $this. This will be used instead of the provided \"$host\" as the advertised address.")
|
log.info("Retrieved public IP from Network Map Service: $this. This will be used instead of the provided \"$host\" as the advertised address.")
|
||||||
}
|
}
|
||||||
retrievedHostName
|
retrievedHostName
|
||||||
} catch (ignore: Throwable) {
|
} catch (ignore: Exception) {
|
||||||
// Cannot reach the network map service, ignore the exception and use provided P2P address instead.
|
// Cannot reach the network map service, ignore the exception and use provided P2P address instead.
|
||||||
log.warn("Cannot connect to the network map service for public IP detection.")
|
log.warn("Cannot connect to the network map service for public IP detection.")
|
||||||
null
|
null
|
||||||
|
@ -190,7 +190,7 @@ open class NodeStartup : NodeStartupLogging {
|
|||||||
node.startupComplete.then {
|
node.startupComplete.then {
|
||||||
try {
|
try {
|
||||||
InteractiveShell.runLocalShell(node::stop)
|
InteractiveShell.runLocalShell(node::stop)
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
logger.error("Shell failed to start", e)
|
logger.error("Shell failed to start", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,6 +232,7 @@ class RPCServer(
|
|||||||
log.error("Failed to send message, kicking client. Message was ${job.message}", throwable)
|
log.error("Failed to send message, kicking client. Message was ${job.message}", throwable)
|
||||||
serverControl!!.closeConsumerConnectionsForAddress(job.clientAddress.toString())
|
serverControl!!.closeConsumerConnectionsForAddress(job.clientAddress.toString())
|
||||||
invalidateClient(job.clientAddress)
|
invalidateClient(job.clientAddress)
|
||||||
|
if (throwable is VirtualMachineError) throw throwable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,8 +94,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
|||||||
override fun run() {
|
override fun run() {
|
||||||
val nextScheduleDelay = try {
|
val nextScheduleDelay = try {
|
||||||
updateNetworkMapCache()
|
updateNetworkMapCache()
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
logger.warn("Error encountered while updating network map, will retry in $defaultRetryInterval", t)
|
logger.warn("Error encountered while updating network map, will retry in $defaultRetryInterval", e)
|
||||||
defaultRetryInterval
|
defaultRetryInterval
|
||||||
}
|
}
|
||||||
// Schedule the next update.
|
// Schedule the next update.
|
||||||
|
@ -219,9 +219,13 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
val result = logic.call()
|
val result = logic.call()
|
||||||
suspend(FlowIORequest.WaitForSessionConfirmations, maySkipCheckpoint = true)
|
suspend(FlowIORequest.WaitForSessionConfirmations, maySkipCheckpoint = true)
|
||||||
Try.Success(result)
|
Try.Success(result)
|
||||||
} catch (throwable: Throwable) {
|
} catch (t: Throwable) {
|
||||||
logger.info("Flow threw exception... sending it to flow hospital", throwable)
|
if(t is VirtualMachineError) {
|
||||||
Try.Failure<R>(throwable)
|
logger.error("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", t)
|
||||||
|
Runtime.getRuntime().halt(1)
|
||||||
|
}
|
||||||
|
logger.info("Flow raised an error... sending it to flow hospital", t)
|
||||||
|
Try.Failure<R>(t)
|
||||||
}
|
}
|
||||||
val softLocksId = if (hasSoftLockedStates) logic.runId.uuid else null
|
val softLocksId = if (hasSoftLockedStates) logic.runId.uuid else null
|
||||||
val finalEvent = when (resultOrError) {
|
val finalEvent = when (resultOrError) {
|
||||||
@ -373,8 +377,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
maySkipCheckpoint = skipPersistingCheckpoint,
|
maySkipCheckpoint = skipPersistingCheckpoint,
|
||||||
fiber = this.checkpointSerialize(context = serializationContext.value)
|
fiber = this.checkpointSerialize(context = serializationContext.value)
|
||||||
)
|
)
|
||||||
} catch (throwable: Throwable) {
|
} catch (exception: Exception) {
|
||||||
Event.Error(throwable)
|
Event.Error(exception)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We must commit the database transaction before returning from this closure otherwise Quasar may schedule
|
// We must commit the database transaction before returning from this closure otherwise Quasar may schedule
|
||||||
|
@ -43,6 +43,7 @@ import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
|||||||
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||||
import net.corda.serialization.internal.withTokenContext
|
import net.corda.serialization.internal.withTokenContext
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||||
|
import org.apache.logging.log4j.LogManager
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.security.SecureRandom
|
import java.security.SecureRandom
|
||||||
@ -135,7 +136,13 @@ class SingleThreadedStateMachineManager(
|
|||||||
val fibers = restoreFlowsFromCheckpoints()
|
val fibers = restoreFlowsFromCheckpoints()
|
||||||
metrics.register("Flows.InFlight", Gauge<Int> { mutex.content.flows.size })
|
metrics.register("Flows.InFlight", Gauge<Int> { mutex.content.flows.size })
|
||||||
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
|
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
|
||||||
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
|
if (throwable is VirtualMachineError) {
|
||||||
|
(fiber as FlowStateMachineImpl<*>).logger.error("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", throwable)
|
||||||
|
LogManager.shutdown(true)
|
||||||
|
Runtime.getRuntime().halt(1)
|
||||||
|
} else {
|
||||||
|
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
serviceHub.networkMapCache.nodeReady.then {
|
serviceHub.networkMapCache.nodeReady.then {
|
||||||
logger.info("Node ready, info: ${serviceHub.myInfo}")
|
logger.info("Node ready, info: ${serviceHub.myInfo}")
|
||||||
@ -606,7 +613,7 @@ class SingleThreadedStateMachineManager(
|
|||||||
private fun deserializeCheckpoint(serializedCheckpoint: SerializedBytes<Checkpoint>): Checkpoint? {
|
private fun deserializeCheckpoint(serializedCheckpoint: SerializedBytes<Checkpoint>): Checkpoint? {
|
||||||
return try {
|
return try {
|
||||||
serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext!!)
|
serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext!!)
|
||||||
} catch (exception: Throwable) {
|
} catch (exception: Exception) {
|
||||||
logger.error("Encountered unrestorable checkpoint!", exception)
|
logger.error("Encountered unrestorable checkpoint!", exception)
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ class TransitionExecutorImpl(
|
|||||||
for (action in transition.actions) {
|
for (action in transition.actions) {
|
||||||
try {
|
try {
|
||||||
actionExecutor.executeAction(fiber, action)
|
actionExecutor.executeAction(fiber, action)
|
||||||
} catch (exception: Throwable) {
|
} catch (exception: Exception) {
|
||||||
contextTransactionOrNull?.close()
|
contextTransactionOrNull?.close()
|
||||||
if (transition.newState.checkpoint.errorState is ErrorState.Errored) {
|
if (transition.newState.checkpoint.errorState is ErrorState.Errored) {
|
||||||
// If we errored while transitioning to an error state then we cannot record the additional
|
// If we errored while transitioning to an error state then we cannot record the additional
|
||||||
|
@ -77,8 +77,8 @@ class FiberDeserializationChecker {
|
|||||||
is Job.Check -> {
|
is Job.Check -> {
|
||||||
try {
|
try {
|
||||||
job.serializedFiber.checkpointDeserialize(context = checkpointSerializationContext)
|
job.serializedFiber.checkpointDeserialize(context = checkpointSerializationContext)
|
||||||
} catch (throwable: Throwable) {
|
} catch (exception: Exception) {
|
||||||
log.error("Encountered unrestorable checkpoint!", throwable)
|
log.error("Encountered unrestorable checkpoint!", exception)
|
||||||
foundUnrestorableFibers = true
|
foundUnrestorableFibers = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -276,7 +276,7 @@ class NodeVaultServiceTest {
|
|||||||
assertThat(vaultService.queryBy<Cash.State>(criteriaByLockId1).states).hasSize(3)
|
assertThat(vaultService.queryBy<Cash.State>(criteriaByLockId1).states).hasSize(3)
|
||||||
}
|
}
|
||||||
println("SOFT LOCK STATES #1 succeeded")
|
println("SOFT LOCK STATES #1 succeeded")
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
println("SOFT LOCK STATES #1 failed")
|
println("SOFT LOCK STATES #1 failed")
|
||||||
} finally {
|
} finally {
|
||||||
countDown.countDown()
|
countDown.countDown()
|
||||||
@ -292,7 +292,7 @@ class NodeVaultServiceTest {
|
|||||||
assertThat(vaultService.queryBy<Cash.State>(criteriaByLockId2).states).hasSize(3)
|
assertThat(vaultService.queryBy<Cash.State>(criteriaByLockId2).states).hasSize(3)
|
||||||
}
|
}
|
||||||
println("SOFT LOCK STATES #2 succeeded")
|
println("SOFT LOCK STATES #2 succeeded")
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
println("SOFT LOCK STATES #2 failed")
|
println("SOFT LOCK STATES #2 failed")
|
||||||
} finally {
|
} finally {
|
||||||
countDown.countDown()
|
countDown.countDown()
|
||||||
|
@ -327,7 +327,7 @@ class TLSAuthenticationTests {
|
|||||||
lock.notifyAll()
|
lock.notifyAll()
|
||||||
}
|
}
|
||||||
sslServerSocket.close()
|
sslServerSocket.close()
|
||||||
} catch (ex: Throwable) {
|
} catch (ex: Exception) {
|
||||||
serverError = true
|
serverError = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ class InterestRateSwapAPI {
|
|||||||
return try {
|
return try {
|
||||||
rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.getOrThrow()
|
rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.getOrThrow()
|
||||||
ResponseEntity.created(URI.create(generateDealLink(newDeal))).build()
|
ResponseEntity.created(URI.create(generateDealLink(newDeal))).build()
|
||||||
} catch (ex: Throwable) {
|
} catch (ex: Exception) {
|
||||||
logger.info("Exception when creating deal: $ex", ex)
|
logger.info("Exception when creating deal: $ex", ex)
|
||||||
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(ex.toString())
|
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(ex.toString())
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,7 @@ import org.apache.qpid.proton.amqp.UnsignedInteger
|
|||||||
import org.apache.qpid.proton.codec.Data
|
import org.apache.qpid.proton.codec.Data
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
import java.io.NotSerializableException
|
import java.io.NotSerializableException
|
||||||
|
import java.lang.Exception
|
||||||
import java.lang.reflect.ParameterizedType
|
import java.lang.reflect.ParameterizedType
|
||||||
import java.lang.reflect.Type
|
import java.lang.reflect.Type
|
||||||
import java.lang.reflect.TypeVariable
|
import java.lang.reflect.TypeVariable
|
||||||
@ -100,8 +101,8 @@ class DeserializationInput constructor(
|
|||||||
throw NotSerializableException(amqp.mitigation)
|
throw NotSerializableException(amqp.mitigation)
|
||||||
} catch (nse: NotSerializableException) {
|
} catch (nse: NotSerializableException) {
|
||||||
throw nse
|
throw nse
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
throw NotSerializableException("Internal deserialization failure: ${t.javaClass.name}: ${t.message}").apply { initCause(t) }
|
throw NotSerializableException("Internal deserialization failure: ${e.javaClass.name}: ${e.message}").apply { initCause(e) }
|
||||||
} finally {
|
} finally {
|
||||||
objectHistory.clear()
|
objectHistory.clear()
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,6 @@ include 'experimental'
|
|||||||
include 'experimental:avalanche'
|
include 'experimental:avalanche'
|
||||||
include 'experimental:behave'
|
include 'experimental:behave'
|
||||||
include 'experimental:quasar-hook'
|
include 'experimental:quasar-hook'
|
||||||
include 'experimental:kryo-hook'
|
|
||||||
include 'experimental:corda-utils'
|
include 'experimental:corda-utils'
|
||||||
include 'experimental:notary-raft'
|
include 'experimental:notary-raft'
|
||||||
include 'experimental:notary-bft-smart'
|
include 'experimental:notary-bft-smart'
|
||||||
|
@ -167,8 +167,8 @@ class DriverTests {
|
|||||||
fun `driver waits for in-process nodes to finish`() {
|
fun `driver waits for in-process nodes to finish`() {
|
||||||
fun NodeHandle.stopQuietly() = try {
|
fun NodeHandle.stopQuietly() = try {
|
||||||
stop()
|
stop()
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
t.printStackTrace()
|
e.printStackTrace()
|
||||||
}
|
}
|
||||||
|
|
||||||
val handlesFuture = openFuture<List<NodeHandle>>()
|
val handlesFuture = openFuture<List<NodeHandle>>()
|
||||||
|
@ -95,8 +95,8 @@ fun <A> poll(
|
|||||||
} else {
|
} else {
|
||||||
executorService.schedule(this, pollInterval.toMillis(), TimeUnit.MILLISECONDS)
|
executorService.schedule(this, pollInterval.toMillis(), TimeUnit.MILLISECONDS)
|
||||||
}
|
}
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
resultFuture.setException(t)
|
resultFuture.setException(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,6 +65,9 @@ class ShutdownManager(private val executorService: ExecutorService) {
|
|||||||
it.value()
|
it.value()
|
||||||
} catch (t: Throwable) {
|
} catch (t: Throwable) {
|
||||||
log.warn("Exception while calling a shutdown action, this might create resource leaks", t)
|
log.warn("Exception while calling a shutdown action, this might create resource leaks", t)
|
||||||
|
if (t is VirtualMachineError) {
|
||||||
|
throw t
|
||||||
|
}
|
||||||
}
|
}
|
||||||
is Try.Failure -> log.warn("Exception while getting shutdown method, disregarding", it.exception)
|
is Try.Failure -> log.warn("Exception while getting shutdown method, disregarding", it.exception)
|
||||||
}
|
}
|
||||||
|
@ -86,8 +86,8 @@ fun startPublishingFixedRateInjector(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (throwable: Throwable) {
|
} catch (e: Exception) {
|
||||||
throwable.printStackTrace()
|
e.printStackTrace()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -110,7 +110,7 @@ class BlobInspector : CordaCliWrapper("blob-inspector", "Convert AMQP serialised
|
|||||||
} else {
|
} else {
|
||||||
null // Not an AMQP blob.
|
null // Not an AMQP blob.
|
||||||
}
|
}
|
||||||
} catch (t: Throwable) {
|
} catch (e: Exception) {
|
||||||
return null // Failed to parse in some other way.
|
return null // Failed to parse in some other way.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -106,10 +106,10 @@ data class LoadTest<T, S>(
|
|||||||
log.info("Executing $it")
|
log.info("Executing $it")
|
||||||
try {
|
try {
|
||||||
nodes.execute(it)
|
nodes.execute(it)
|
||||||
} catch (exception: Throwable) {
|
} catch (throwable: Throwable) {
|
||||||
val diagnostic = executeDiagnostic(state, newState, it, exception)
|
val diagnostic = executeDiagnostic(state, newState, it, throwable)
|
||||||
log.error(diagnostic)
|
log.error(diagnostic)
|
||||||
throw Exception(diagnostic)
|
throw if (throwable is Exception) Exception(diagnostic) else throwable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,13 +30,13 @@ class DockerInstantiator(private val volume: LocalVolume,
|
|||||||
try {
|
try {
|
||||||
localClient.killContainerCmd(container.id).exec()
|
localClient.killContainerCmd(container.id).exec()
|
||||||
LOG.info("Found running container: $instanceName killed")
|
LOG.info("Found running container: $instanceName killed")
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
//container not running
|
//container not running
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
localClient.removeContainerCmd(container.id).exec()
|
localClient.removeContainerCmd(container.id).exec()
|
||||||
LOG.info("Found existing container: $instanceName removed")
|
LOG.info("Found existing container: $instanceName removed")
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
//this *only* occurs of the container had been previously scheduled for removal
|
//this *only* occurs of the container had been previously scheduled for removal
|
||||||
//but did not complete before this attempt was begun.
|
//but did not complete before this attempt was begun.
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.bootstrapper.nodes
|
package net.corda.bootstrapper.nodes
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigException
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import net.corda.bootstrapper.Constants
|
import net.corda.bootstrapper.Constants
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
@ -11,7 +12,7 @@ class NodeFinder(private val scratchDir: File) {
|
|||||||
return scratchDir.walkBottomUp().filter { it.name == "node.conf" && !it.absolutePath.contains(Constants.BOOTSTRAPPER_DIR_NAME) }.map {
|
return scratchDir.walkBottomUp().filter { it.name == "node.conf" && !it.absolutePath.contains(Constants.BOOTSTRAPPER_DIR_NAME) }.map {
|
||||||
try {
|
try {
|
||||||
ConfigFactory.parseFile(it) to it
|
ConfigFactory.parseFile(it) to it
|
||||||
} catch (t: Throwable) {
|
} catch (e: ConfigException) {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
}.filterNotNull()
|
}.filterNotNull()
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.bootstrapper.notaries
|
package net.corda.bootstrapper.notaries
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigException
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import net.corda.bootstrapper.Constants
|
import net.corda.bootstrapper.Constants
|
||||||
import net.corda.bootstrapper.nodes.FoundNode
|
import net.corda.bootstrapper.nodes.FoundNode
|
||||||
@ -12,7 +13,7 @@ class NotaryFinder(private val dirToSearch: File) {
|
|||||||
.map {
|
.map {
|
||||||
try {
|
try {
|
||||||
ConfigFactory.parseFile(it) to it
|
ConfigFactory.parseFile(it) to it
|
||||||
} catch (t: Throwable) {
|
} catch (e: ConfigException) {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
}.filterNotNull()
|
}.filterNotNull()
|
||||||
|
@ -45,7 +45,7 @@ class AzureSmbVolume(private val azure: Azure, private val resourceGroup: Resour
|
|||||||
cloudFileShare.createIfNotExists()
|
cloudFileShare.createIfNotExists()
|
||||||
networkParamsFolder.createIfNotExists()
|
networkParamsFolder.createIfNotExists()
|
||||||
break
|
break
|
||||||
} catch (e: Throwable) {
|
} catch (e: Exception) {
|
||||||
LOG.debug("storage account not ready, waiting")
|
LOG.debug("storage account not ready, waiting")
|
||||||
Thread.sleep(5000)
|
Thread.sleep(5000)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user