mirror of
https://github.com/corda/corda.git
synced 2025-02-07 11:30:22 +00:00
Refactoring related to BFT state persistence/transfer (#829)
* Remove unused type param * If we drop Runnable we can use conciser syntax * Sometimes we need the fully-fledged object, so retire separate handle class * Implement IntelliJ suggestion
This commit is contained in:
parent
ec0e0dd442
commit
77ab6d4af3
@ -104,7 +104,7 @@ fun <T> ListenableFuture<T>.failure(executor: Executor, body: (Throwable) -> Uni
|
|||||||
infix fun <T> ListenableFuture<T>.then(body: () -> Unit): ListenableFuture<T> = apply { then(RunOnCallerThread, body) }
|
infix fun <T> ListenableFuture<T>.then(body: () -> Unit): ListenableFuture<T> = apply { then(RunOnCallerThread, body) }
|
||||||
infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T> = apply { success(RunOnCallerThread, body) }
|
infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T> = apply { success(RunOnCallerThread, body) }
|
||||||
infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) }
|
infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) }
|
||||||
fun <T> ListenableFuture<T>.andForget(log: Logger) = failure(RunOnCallerThread) { log.error("Background task failed:", it) }
|
fun ListenableFuture<*>.andForget(log: Logger) = failure(RunOnCallerThread) { log.error("Background task failed:", it) }
|
||||||
@Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe.
|
@Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe.
|
||||||
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, { (mapper as (F?) -> T)(it) })
|
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, { (mapper as (F?) -> T)(it) })
|
||||||
infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) }
|
infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) }
|
||||||
|
@ -542,7 +542,7 @@ abstract class AbstractAttachment(dataLoader: () -> ByteArray) : Attachment {
|
|||||||
val storage = serviceHub.storageService.attachments
|
val storage = serviceHub.storageService.attachments
|
||||||
return {
|
return {
|
||||||
val a = storage.openAttachment(id) ?: throw MissingAttachmentsException(listOf(id))
|
val a = storage.openAttachment(id) ?: throw MissingAttachmentsException(listOf(id))
|
||||||
if (a is AbstractAttachment) a.attachmentData else a.open().use { it.readBytes() }
|
(a as? AbstractAttachment)?.attachmentData ?: a.open().use { it.readBytes() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -171,9 +171,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
lateinit var scheduler: NodeSchedulerService
|
lateinit var scheduler: NodeSchedulerService
|
||||||
lateinit var schemas: SchemaService
|
lateinit var schemas: SchemaService
|
||||||
lateinit var auditService: AuditService
|
lateinit var auditService: AuditService
|
||||||
protected val runOnStop: ArrayList<Runnable> = ArrayList()
|
protected val runOnStop = ArrayList<() -> Any?>()
|
||||||
lateinit var database: Database
|
lateinit var database: Database
|
||||||
protected var dbCloser: Runnable? = null
|
protected var dbCloser: (() -> Any?)? = null
|
||||||
private lateinit var rpcFlows: List<Class<out FlowLogic<*>>>
|
private lateinit var rpcFlows: List<Class<out FlowLogic<*>>>
|
||||||
|
|
||||||
var isPreviousCheckpointsPresent = false
|
var isPreviousCheckpointsPresent = false
|
||||||
@ -225,7 +225,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
smm.tokenizableServices.addAll(tokenizableServices)
|
smm.tokenizableServices.addAll(tokenizableServices)
|
||||||
|
|
||||||
if (serverThread is ExecutorService) {
|
if (serverThread is ExecutorService) {
|
||||||
runOnStop += Runnable {
|
runOnStop += {
|
||||||
// We wait here, even though any in-flight messages should have been drained away because the
|
// We wait here, even though any in-flight messages should have been drained away because the
|
||||||
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
|
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
|
||||||
// arbitrary and might be inappropriate.
|
// arbitrary and might be inappropriate.
|
||||||
@ -256,11 +256,11 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
|
|
||||||
initUploaders()
|
initUploaders()
|
||||||
|
|
||||||
runOnStop += Runnable { network.stop() }
|
runOnStop += network::stop
|
||||||
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
|
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
|
||||||
smm.start()
|
smm.start()
|
||||||
// Shut down the SMM so no Fibers are scheduled.
|
// Shut down the SMM so no Fibers are scheduled.
|
||||||
runOnStop += Runnable { smm.stop(acceptableLiveFiberCountOnStop()) }
|
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
}
|
}
|
||||||
started = true
|
started = true
|
||||||
@ -573,8 +573,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
this.database = database
|
this.database = database
|
||||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||||
log.info("Connected to ${database.vendor} database.")
|
log.info("Connected to ${database.vendor} database.")
|
||||||
dbCloser = Runnable { toClose.close() }
|
toClose::close.let {
|
||||||
runOnStop += dbCloser!!
|
dbCloser = it
|
||||||
|
runOnStop += it
|
||||||
|
}
|
||||||
database.transaction {
|
database.transaction {
|
||||||
insideTransaction()
|
insideTransaction()
|
||||||
}
|
}
|
||||||
@ -711,7 +713,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
|
|
||||||
// Run shutdown hooks in opposite order to starting
|
// Run shutdown hooks in opposite order to starting
|
||||||
for (toRun in runOnStop.reversed()) {
|
for (toRun in runOnStop.reversed()) {
|
||||||
toRun.run()
|
toRun()
|
||||||
}
|
}
|
||||||
runOnStop.clear()
|
runOnStop.clear()
|
||||||
}
|
}
|
||||||
|
@ -226,7 +226,7 @@ class Node(override val configuration: FullNodeConfiguration,
|
|||||||
override fun startMessagingService(rpcOps: RPCOps) {
|
override fun startMessagingService(rpcOps: RPCOps) {
|
||||||
// Start up the embedded MQ server
|
// Start up the embedded MQ server
|
||||||
messageBroker?.apply {
|
messageBroker?.apply {
|
||||||
runOnStop += Runnable { stop() }
|
runOnStop += this::stop
|
||||||
start()
|
start()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -248,7 +248,7 @@ class Node(override val configuration: FullNodeConfiguration,
|
|||||||
RaftValidatingNotaryService.type, RaftNonValidatingNotaryService.type -> with(configuration) {
|
RaftValidatingNotaryService.type, RaftNonValidatingNotaryService.type -> with(configuration) {
|
||||||
val provider = RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
|
val provider = RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
|
||||||
provider.start()
|
provider.start()
|
||||||
runOnStop += Runnable { provider.stop() }
|
runOnStop += provider::stop
|
||||||
provider
|
provider
|
||||||
}
|
}
|
||||||
else -> PersistentUniquenessProvider()
|
else -> PersistentUniquenessProvider()
|
||||||
@ -277,7 +277,7 @@ class Node(override val configuration: FullNodeConfiguration,
|
|||||||
"-tcpAllowOthers",
|
"-tcpAllowOthers",
|
||||||
"-tcpDaemon",
|
"-tcpDaemon",
|
||||||
"-key", "node", databaseName)
|
"-key", "node", databaseName)
|
||||||
runOnStop += Runnable { server.stop() }
|
runOnStop += server::stop
|
||||||
val url = server.start().url
|
val url = server.start().url
|
||||||
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
|
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ import java.nio.file.Files
|
|||||||
* Each instance of this class creates such a configHome, accessible via [path].
|
* Each instance of this class creates such a configHome, accessible via [path].
|
||||||
* The files are deleted on [close] typically via [use], see [PathManager] for details.
|
* The files are deleted on [close] typically via [use], see [PathManager] for details.
|
||||||
*/
|
*/
|
||||||
class BFTSMaRtConfig(replicaAddresses: List<HostAndPort>) : PathManager(Files.createTempDirectory("bft-smart-config")) {
|
class BFTSMaRtConfig(replicaAddresses: List<HostAndPort>) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) {
|
||||||
companion object {
|
companion object {
|
||||||
internal val portIsClaimedFormat = "Port %s is claimed by another replica: %s"
|
internal val portIsClaimedFormat = "Port %s is claimed by another replica: %s"
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ import java.io.Closeable
|
|||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
internal class DeleteOnExitPath(internal val path: Path) {
|
private class DeleteOnExitPath(internal val path: Path) {
|
||||||
private val shutdownHook = addShutdownHook { dispose() }
|
private val shutdownHook = addShutdownHook { dispose() }
|
||||||
internal fun dispose() {
|
internal fun dispose() {
|
||||||
path.toFile().deleteRecursively()
|
path.toFile().deleteRecursively()
|
||||||
@ -13,31 +13,31 @@ internal class DeleteOnExitPath(internal val path: Path) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
open class PathHandle internal constructor(private val deleteOnExitPath: DeleteOnExitPath, private val handleCounter: AtomicInteger) : Closeable {
|
|
||||||
val path
|
|
||||||
get(): Path {
|
|
||||||
val path = deleteOnExitPath.path
|
|
||||||
check(handleCounter.get() != 0) { "Defunct path: $path" }
|
|
||||||
return path
|
|
||||||
}
|
|
||||||
|
|
||||||
init {
|
|
||||||
handleCounter.incrementAndGet()
|
|
||||||
}
|
|
||||||
|
|
||||||
fun handle() = PathHandle(deleteOnExitPath, handleCounter)
|
|
||||||
|
|
||||||
override fun close() {
|
|
||||||
if (handleCounter.decrementAndGet() == 0) {
|
|
||||||
deleteOnExitPath.dispose()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An instance of this class is a handle on a temporary [path].
|
* An instance of this class is a handle on a temporary [path].
|
||||||
* If necessary, additional handles on the same path can be created using the [handle] method.
|
* If necessary, additional handles on the same path can be created using the [handle] method.
|
||||||
* The path is (recursively) deleted when [close] is called on the last handle, typically at the end of a [use] expression.
|
* The path is (recursively) deleted when [close] is called on the last handle, typically at the end of a [use] expression.
|
||||||
* The value of eager cleanup of temporary files is that there are cases when shutdown hooks don't run e.g. SIGKILL.
|
* The value of eager cleanup of temporary files is that there are cases when shutdown hooks don't run e.g. SIGKILL.
|
||||||
*/
|
*/
|
||||||
open class PathManager(path: Path) : PathHandle(DeleteOnExitPath(path), AtomicInteger())
|
open class PathManager<T : PathManager<T>>(path: Path) : Closeable {
|
||||||
|
private val deleteOnExitPath = DeleteOnExitPath(path)
|
||||||
|
private val handleCounter = AtomicInteger(1)
|
||||||
|
val path
|
||||||
|
get(): Path {
|
||||||
|
val path = deleteOnExitPath.path
|
||||||
|
check(handleCounter.get() != 0) { "Defunct path: $path" }
|
||||||
|
return path
|
||||||
|
}
|
||||||
|
|
||||||
|
fun handle(): T {
|
||||||
|
handleCounter.incrementAndGet()
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
|
return this as T
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
if (handleCounter.decrementAndGet() == 0) {
|
||||||
|
deleteOnExitPath.dispose()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -8,9 +8,11 @@ import kotlin.test.assertFalse
|
|||||||
import kotlin.test.assertTrue
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
class PathManagerTests {
|
class PathManagerTests {
|
||||||
|
private class MyPathManager : PathManager<MyPathManager>(Files.createTempFile(MyPathManager::class.simpleName, null))
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `path deleted when manager closed`() {
|
fun `path deleted when manager closed`() {
|
||||||
val manager = PathManager(Files.createTempFile(javaClass.simpleName, null))
|
val manager = MyPathManager()
|
||||||
val leakedPath = manager.use {
|
val leakedPath = manager.use {
|
||||||
it.path.also { assertTrue(it.exists()) }
|
it.path.also { assertTrue(it.exists()) }
|
||||||
}
|
}
|
||||||
@ -20,7 +22,7 @@ class PathManagerTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `path deleted when handle closed`() {
|
fun `path deleted when handle closed`() {
|
||||||
val handle = PathManager(Files.createTempFile(javaClass.simpleName, null)).use {
|
val handle = MyPathManager().use {
|
||||||
it.handle()
|
it.handle()
|
||||||
}
|
}
|
||||||
val leakedPath = handle.use {
|
val leakedPath = handle.use {
|
||||||
|
@ -243,7 +243,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun manuallyCloseDB() {
|
fun manuallyCloseDB() {
|
||||||
dbCloser?.run()
|
dbCloser?.invoke()
|
||||||
dbCloser = null
|
dbCloser = null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user