mirror of
https://github.com/corda/corda.git
synced 2025-06-17 22:58:19 +00:00
Merge pull request #555 from corda/aslemmer-fix-h2-and-copycat-server-leak
Shutdown copycat server and h2 database on Node shutdown
This commit is contained in:
@ -196,7 +196,10 @@ class Node(override val configuration: FullNodeConfiguration,
|
|||||||
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider {
|
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider {
|
||||||
return when (type) {
|
return when (type) {
|
||||||
RaftValidatingNotaryService.type, RaftNonValidatingNotaryService.type -> with(configuration) {
|
RaftValidatingNotaryService.type, RaftNonValidatingNotaryService.type -> with(configuration) {
|
||||||
RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
|
val provider = RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
|
||||||
|
provider.start()
|
||||||
|
runOnStop += Runnable { provider.stop() }
|
||||||
|
provider
|
||||||
}
|
}
|
||||||
else -> PersistentUniquenessProvider()
|
else -> PersistentUniquenessProvider()
|
||||||
}
|
}
|
||||||
@ -224,6 +227,7 @@ class Node(override val configuration: FullNodeConfiguration,
|
|||||||
"-tcpAllowOthers",
|
"-tcpAllowOthers",
|
||||||
"-tcpDaemon",
|
"-tcpDaemon",
|
||||||
"-key", "node", databaseName)
|
"-key", "node", databaseName)
|
||||||
|
runOnStop += Runnable { server.stop() }
|
||||||
val url = server.start().url
|
val url = server.start().url
|
||||||
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
|
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
|
||||||
}
|
}
|
||||||
|
@ -42,14 +42,20 @@ import javax.annotation.concurrent.ThreadSafe
|
|||||||
* @param config SSL configuration
|
* @param config SSL configuration
|
||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class RaftUniquenessProvider(storagePath: Path, myAddress: HostAndPort, clusterAddresses: List<HostAndPort>,
|
class RaftUniquenessProvider(
|
||||||
db: Database, config: SSLConfiguration) : UniquenessProvider, SingletonSerializeAsToken() {
|
val storagePath: Path,
|
||||||
|
val myAddress: HostAndPort,
|
||||||
|
val clusterAddresses: List<HostAndPort>,
|
||||||
|
val db: Database,
|
||||||
|
val config: SSLConfiguration
|
||||||
|
) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = loggerFor<RaftUniquenessProvider>()
|
private val log = loggerFor<RaftUniquenessProvider>()
|
||||||
private val DB_TABLE_NAME = "notary_committed_states"
|
private val DB_TABLE_NAME = "notary_committed_states"
|
||||||
}
|
}
|
||||||
|
|
||||||
private val _clientFuture: CompletableFuture<CopycatClient>
|
private lateinit var _clientFuture: CompletableFuture<CopycatClient>
|
||||||
|
private lateinit var server: CopycatServer
|
||||||
/**
|
/**
|
||||||
* Copycat clients are responsible for connecting to the cluster and submitting commands and queries that operate
|
* Copycat clients are responsible for connecting to the cluster and submitting commands and queries that operate
|
||||||
* on the cluster's replicated state machine.
|
* on the cluster's replicated state machine.
|
||||||
@ -57,7 +63,7 @@ class RaftUniquenessProvider(storagePath: Path, myAddress: HostAndPort, clusterA
|
|||||||
private val client: CopycatClient
|
private val client: CopycatClient
|
||||||
get() = _clientFuture.get()
|
get() = _clientFuture.get()
|
||||||
|
|
||||||
init {
|
fun start() {
|
||||||
log.info("Creating Copycat server, log stored in: ${storagePath.toFile()}")
|
log.info("Creating Copycat server, log stored in: ${storagePath.toFile()}")
|
||||||
val stateMachineFactory = { DistributedImmutableMap<String, ByteArray>(db, DB_TABLE_NAME) }
|
val stateMachineFactory = { DistributedImmutableMap<String, ByteArray>(db, DB_TABLE_NAME) }
|
||||||
val address = Address(myAddress.host, myAddress.port)
|
val address = Address(myAddress.host, myAddress.port)
|
||||||
@ -65,7 +71,7 @@ class RaftUniquenessProvider(storagePath: Path, myAddress: HostAndPort, clusterA
|
|||||||
val transport = buildTransport(config)
|
val transport = buildTransport(config)
|
||||||
val serializer = Serializer()
|
val serializer = Serializer()
|
||||||
|
|
||||||
val server = CopycatServer.builder(address)
|
server = CopycatServer.builder(address)
|
||||||
.withStateMachine(stateMachineFactory)
|
.withStateMachine(stateMachineFactory)
|
||||||
.withStorage(storage)
|
.withStorage(storage)
|
||||||
.withServerTransport(transport)
|
.withServerTransport(transport)
|
||||||
@ -89,6 +95,10 @@ class RaftUniquenessProvider(storagePath: Path, myAddress: HostAndPort, clusterA
|
|||||||
_clientFuture = serverFuture.thenCompose { client.connect(address) }
|
_clientFuture = serverFuture.thenCompose { client.connect(address) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun stop() {
|
||||||
|
server.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
private fun buildStorage(storagePath: Path): Storage? {
|
private fun buildStorage(storagePath: Path): Storage? {
|
||||||
return Storage.builder()
|
return Storage.builder()
|
||||||
.withDirectory(storagePath.toFile())
|
.withDirectory(storagePath.toFile())
|
||||||
|
Reference in New Issue
Block a user