mirror of
https://github.com/corda/corda.git
synced 2025-04-19 00:27:13 +00:00
Merge fixes
This commit is contained in:
parent
02fae5f385
commit
d040945b0e
@ -63,7 +63,6 @@ val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get()
|
||||
class CordaPersistence(
|
||||
databaseConfig: DatabaseConfig,
|
||||
schemas: Set<MappedSchema>,
|
||||
val jdbcUrl: String,
|
||||
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet()
|
||||
) : Closeable {
|
||||
companion object {
|
||||
@ -72,11 +71,11 @@ class CordaPersistence(
|
||||
|
||||
private val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
|
||||
val hibernateConfig: HibernateConfiguration by lazy {
|
||||
|
||||
transaction {
|
||||
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl)
|
||||
}
|
||||
}
|
||||
|
||||
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas
|
||||
|
||||
data class Boundary(val txId: UUID, val success: Boolean)
|
||||
@ -84,8 +83,11 @@ class CordaPersistence(
|
||||
private var _dataSource: DataSource? = null
|
||||
val dataSource: DataSource get() = checkNotNull(_dataSource) { "CordaPersistence not started" }
|
||||
|
||||
fun start(dataSource: DataSource) {
|
||||
private lateinit var jdbcUrl: String
|
||||
|
||||
fun start(dataSource: DataSource, jdbcUrl: String) {
|
||||
_dataSource = dataSource
|
||||
this.jdbcUrl = jdbcUrl
|
||||
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
|
||||
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
|
||||
// database transaction open. The [transaction] helper above handles this in a finally clause for you
|
||||
|
@ -360,7 +360,7 @@ class ArtemisMessagingTest {
|
||||
}
|
||||
|
||||
private fun startNodeMessagingClient(maxMessageSize: Int = MAX_MESSAGE_SIZE) {
|
||||
messagingClient!!.start(identity.public, null, maxMessageSize)
|
||||
messagingClient!!.start(identity.public, null, maxMessageSize, legalName = ALICE_NAME.toString())
|
||||
}
|
||||
|
||||
private fun createAndStartClientAndServer(platformVersion: Int = 1, serverMaxMessageSize: Int = MAX_MESSAGE_SIZE,
|
||||
@ -395,6 +395,7 @@ class ArtemisMessagingTest {
|
||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||
database,
|
||||
networkMapCache,
|
||||
MetricRegistry(),
|
||||
isDrainingModeOn = { false },
|
||||
drainingModeWasChangedEvents = PublishSubject.create()).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
|
@ -79,10 +79,7 @@ import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.IncompatibleAttachmentsContractsTableName
|
||||
import net.corda.nodeapi.internal.persistence.*
|
||||
import net.corda.nodeapi.internal.storeLegalIdentity
|
||||
import net.corda.tools.shell.InteractiveShell
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
@ -107,8 +104,6 @@ import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.TimeUnit.MINUTES
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import kotlin.collections.set
|
||||
@ -180,7 +175,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
@Suppress("LeakingThis")
|
||||
val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize()
|
||||
val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) }
|
||||
private val metricRegistry = MetricRegistry()
|
||||
val metricRegistry = MetricRegistry()
|
||||
val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize()
|
||||
val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize()
|
||||
@Suppress("LeakingThis")
|
||||
@ -795,7 +790,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
database.hikariStart(props)
|
||||
database.hikariStart(props, configuration.database, schemaService)
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
logVendorString(database, log)
|
||||
}
|
||||
@ -1061,7 +1056,7 @@ fun configureDatabase(hikariProperties: Properties,
|
||||
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
|
||||
schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
|
||||
val persistence = createCordaPersistence(databaseConfig, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService)
|
||||
persistence.hikariStart(hikariProperties)
|
||||
persistence.hikariStart(hikariProperties, databaseConfig, schemaService)
|
||||
return persistence
|
||||
}
|
||||
|
||||
@ -1074,18 +1069,22 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
||||
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
|
||||
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
|
||||
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, attributeConverters)
|
||||
}
|
||||
|
||||
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||
fun CordaPersistence.hikariStart(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemaService: SchemaService) {
|
||||
try {
|
||||
val dataSource = DataSourceFactory.createDataSource(hikariProperties)
|
||||
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
||||
SchemaMigration(
|
||||
val schemaMigration = SchemaMigration(
|
||||
schemaService.schemaOptions.keys,
|
||||
dataSource,
|
||||
!isH2Database(jdbcUrl),
|
||||
databaseConfig).nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })return CordaPersistence( databaseConfig, schemaService.schemaOptions.keys, jdbcUrl,attributeConverters)}
|
||||
|
||||
fun CordaPersistence.hikariStart(hikariProperties: Properties) {
|
||||
try {
|
||||
start(DataSourceFactory.createDataSource(hikariProperties))
|
||||
databaseConfig
|
||||
)
|
||||
schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
|
||||
start(dataSource, jdbcUrl)
|
||||
} catch (ex: Exception) {
|
||||
when {
|
||||
ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex)
|
||||
|
@ -27,7 +27,6 @@ import net.corda.node.services.config.RelayConfiguration
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineManager
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import org.fusesource.jansi.Ansi
|
||||
import org.fusesource.jansi.AnsiConsole
|
||||
import java.io.IOException
|
||||
@ -184,7 +183,7 @@ D""".trimStart()
|
||||
return MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize)
|
||||
}
|
||||
|
||||
override fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {
|
||||
override fun makeStateMachineManager(): StateMachineManager {
|
||||
if (configuration.enterpriseConfiguration.useMultiThreadedSMM) {
|
||||
val executor = makeStateMachineExecutorService()
|
||||
runOnStop += { executor.shutdown() }
|
||||
@ -199,7 +198,7 @@ D""".trimStart()
|
||||
)
|
||||
} else {
|
||||
log.info("Single-threaded state machine manager with 1 thread.")
|
||||
return super.makeStateMachineManager(database)
|
||||
return super.makeStateMachineManager()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -182,6 +182,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
nodeExecutor = serverThread,
|
||||
database = database,
|
||||
networkMap = networkMapCache,
|
||||
metricRegistry = metricRegistry,
|
||||
isDrainingModeOn = nodeProperties.flowsDrainingMode::isEnabled,
|
||||
drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values
|
||||
)
|
||||
@ -214,10 +215,9 @@ open class Node(configuration: NodeConfiguration,
|
||||
startLocalRpcBroker(securityManager)
|
||||
}
|
||||
|
||||
val advertisedAddress = info.addresses.single()
|
||||
val externalBridge = configuration.enterpriseConfiguration.externalBridge
|
||||
val bridgeControlListener = if (externalBridge == null || !externalBridge) {
|
||||
BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
|
||||
BridgeControlListener(configuration, client.serverAddress, networkParameters.maxMessageSize)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
@ -243,7 +243,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
start()
|
||||
}
|
||||
// Start P2P bridge service
|
||||
bridgeControlListener.apply {
|
||||
bridgeControlListener?.apply {
|
||||
closeOnStop()
|
||||
start()
|
||||
}
|
||||
@ -256,8 +256,9 @@ open class Node(configuration: NodeConfiguration,
|
||||
client.start(
|
||||
myIdentity = nodeInfo.legalIdentities[0].owningKey,
|
||||
serviceIdentity = if (nodeInfo.legalIdentities.size == 1) null else nodeInfo.legalIdentities[1].owningKey,
|
||||
advertisedAddress = nodeInfo.addresses[0],
|
||||
maxMessageSize = networkParameters.maxMessageSize
|
||||
advertisedAddress = nodeInfo.addresses.single(),
|
||||
maxMessageSize = networkParameters.maxMessageSize,
|
||||
legalName = nodeInfo.legalIdentities[0].name.toString()
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private val database: CordaPersistence,
|
||||
private val networkMap: NetworkMapCacheInternal,
|
||||
private val metricRegistry: MetricRegistry,
|
||||
val legalName: String,
|
||||
private val isDrainingModeOn: () -> Boolean,
|
||||
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>
|
||||
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver {
|
||||
@ -154,7 +153,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
* in the network map data.
|
||||
* @param maxMessageSize A bound applied to the message size.
|
||||
*/
|
||||
fun start(myIdentity: PublicKey, serviceIdentity: PublicKey?, maxMessageSize: Int, advertisedAddress: NetworkHostAndPort = serverAddress) {
|
||||
fun start(myIdentity: PublicKey, serviceIdentity: PublicKey?, maxMessageSize: Int, advertisedAddress: NetworkHostAndPort = serverAddress, legalName: String) {
|
||||
this.myIdentity = myIdentity
|
||||
this.serviceIdentity = serviceIdentity
|
||||
this.advertisedAddress = advertisedAddress
|
||||
|
@ -118,7 +118,7 @@ class MultiThreadedStateMachineManager(
|
||||
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
|
||||
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
|
||||
private val transitionExecutor = makeTransitionExecutor()
|
||||
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
|
||||
private val ourSenderUUID get() = serviceHub.networkService.ourSenderUUID
|
||||
|
||||
private var checkpointSerializationContext: SerializationContext? = null
|
||||
private var tokenizableServices: List<Any>? = null
|
||||
|
@ -70,10 +70,7 @@ class NodeTest {
|
||||
val configuration = createConfig(ALICE_NAME)
|
||||
val info = VersionInfo(789, "3.0", "SNAPSHOT", "R3")
|
||||
configureDatabase(configuration.dataSourceProperties, configuration.database, { null }, { null }).use {
|
||||
val versionInfo = rigorousMock<VersionInfo>().also {
|
||||
doReturn(platformVersion).whenever(it).platformVersion
|
||||
}
|
||||
val node = Node(configuration, versionInfo, initialiseSerialization = false)
|
||||
val node = Node(configuration, info, initialiseSerialization = false)
|
||||
assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial)
|
||||
}
|
||||
}
|
||||
@ -179,7 +176,11 @@ class NodeTest {
|
||||
flowTimeout = FlowTimeoutConfiguration(timeout = Duration.ZERO, backoffBase = 1.0, maxRestartCount = 1),
|
||||
rpcSettings = NodeRpcSettings(address = fakeAddress, adminAddress = null, ssl = null),
|
||||
messagingServerAddress = null,
|
||||
notary = null
|
||||
notary = null,
|
||||
enterpriseConfiguration = EnterpriseConfiguration(
|
||||
mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0)
|
||||
),
|
||||
relay = null
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -166,11 +166,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
bobNode.dispose()
|
||||
|
||||
aliceNode.database.transaction {
|
||||
assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty()
|
||||
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
aliceNode.internals.manuallyCloseDB()
|
||||
bobNode.database.transaction {
|
||||
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
|
||||
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
bobNode.internals.manuallyCloseDB()
|
||||
}
|
||||
@ -224,11 +224,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
bobNode.dispose()
|
||||
|
||||
aliceNode.database.transaction {
|
||||
assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty()
|
||||
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
aliceNode.internals.manuallyCloseDB()
|
||||
bobNode.database.transaction {
|
||||
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
|
||||
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
bobNode.internals.manuallyCloseDB()
|
||||
}
|
||||
@ -281,7 +281,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
|
||||
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
|
||||
bobNode.database.transaction {
|
||||
assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1)
|
||||
assertThat(bobNode.internals.checkpointStorage.checkpoints()).hasSize(1)
|
||||
}
|
||||
|
||||
val storage = bobNode.services.validatedTransactions
|
||||
@ -314,10 +314,10 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
|
||||
assertThat(bobNode.smm.findStateMachines(Buyer::class.java)).isEmpty()
|
||||
bobNode.database.transaction {
|
||||
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
|
||||
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
aliceNode.database.transaction {
|
||||
assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty()
|
||||
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
|
||||
bobNode.database.transaction {
|
||||
@ -340,8 +340,8 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
return mockNet.createNode(InternalMockNodeParameters(legalName = name), nodeFactory = { args, _ ->
|
||||
object : InternalMockNetwork.MockNode(args) {
|
||||
// That constructs a recording tx storage
|
||||
override fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage {
|
||||
return RecordingTransactionStorage(database, super.makeTransactionStorage(database, transactionCacheSizeBytes))
|
||||
override fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage {
|
||||
return RecordingTransactionStorage(database, super.makeTransactionStorage(transactionCacheSizeBytes))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user