rewrite NodeMessagingClient to use Hibernate

This commit is contained in:
szymonsztuka 2017-08-22 13:22:15 +01:00 committed by GitHub
parent 09adba8275
commit 709495957d
4 changed files with 98 additions and 37 deletions

View File

@ -11,6 +11,9 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.*
import net.corda.node.VersionInfo
@ -36,13 +39,12 @@ import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.security.PublicKey
import java.time.Instant
import java.util.*
import java.util.concurrent.*
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
@ -68,12 +70,12 @@ import javax.annotation.concurrent.ThreadSafe
*/
@ThreadSafe
class NodeMessagingClient(override val config: NodeConfiguration,
val versionInfo: VersionInfo,
val serverAddress: NetworkHostAndPort,
val myIdentity: PublicKey?,
val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
private val versionInfo: VersionInfo,
private val serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey?,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: CordaPersistence,
val networkMapRegistrationFuture: CordaFuture<Unit>,
private val networkMapRegistrationFuture: CordaFuture<Unit>,
val monitoringService: MonitoringService,
advertisedAddress: NetworkHostAndPort = serverAddress
) : ArtemisMessagingComponent(), MessagingService {
@ -93,6 +95,38 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
private val messageMaxRetryCount: Int = 3
fun createProcessedMessage(): AppendOnlyPersistentMap<UUID, Instant, ProcessedMessage, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = { Pair(UUID.fromString(it.uuid), it.insertionTime) },
toPersistentEntity = { key: UUID, value: Instant ->
ProcessedMessage().apply {
uuid = key.toString()
insertionTime = value
}
},
persistentEntityClass = ProcessedMessage::class.java
)
}
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
return PersistentMap(
toPersistentEntityKey = { it },
fromPersistentEntity = { Pair(it.key,
Pair(it.message.deserialize( context = SerializationDefaults.STORAGE_CONTEXT),
it.recipients.deserialize( context = SerializationDefaults.STORAGE_CONTEXT))
) },
toPersistentEntity = { _key: Long, (_message: Message, _recipient: MessageRecipients): Pair<Message, MessageRecipients> ->
RetryMessage().apply {
key = _key
message = _message.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
recipients = _recipient.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
}
},
persistentEntityClass = RetryMessage::class.java
)
}
}
private class InnerState {
@ -107,11 +141,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
var verificationResponseConsumer: ClientConsumer? = null
}
val messagesToRedeliver = database.transaction {
JDBCHashMap<Long, Pair<Message, MessageRecipients>>("${NODE_DATABASE_PREFIX}message_retry", true)
private val messagesToRedeliver = database.transaction {
createMessageToRedeliver()
}
val scheduledMessageRedeliveries = ConcurrentHashMap<Long, ScheduledFuture<*>>()
private val scheduledMessageRedeliveries = ConcurrentHashMap<Long, ScheduledFuture<*>>()
val verifierService = when (config.verifierType) {
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
@ -139,17 +173,33 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}message_ids") {
val uuid = uuidString("message_id")
}
private val processedMessages = createProcessedMessage()
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids")
class ProcessedMessage(
@Id
@Column(name = "message_id", length = 36)
var uuid: String = "",
@Column(name = "insertion_time")
var insertionTime: Instant = Instant.now()
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_retry")
class RetryMessage(
@Id
@Column(name = "message_id", length = 36)
var key: Long = 0,
@Lob
@Column
var message: ByteArray = ByteArray(0),
@Lob
@Column
var recipients: ByteArray = ByteArray(0)
)
fun start(rpcOps: RPCOps, userService: RPCUserService) {
@ -374,7 +424,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
callHandlers(msg, deliverTo)
}
// TODO We will at some point need to decide a trimming policy for the id's
processedMessages += msg.uniqueMessageId
processedMessages[msg.uniqueMessageId] = Instant.now()
}
}
}

View File

@ -11,6 +11,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.services.api.SchemaService
import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.PersistentNetworkMapService
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
@ -41,7 +42,9 @@ class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaS
NodeSchedulerService.PersistentScheduledState::class.java,
NodeAttachmentService.DBAttachment::class.java,
PersistentNetworkMapService.NetworkNode::class.java,
PersistentNetworkMapService.NetworkSubscriber::class.java
PersistentNetworkMapService.NetworkSubscriber::class.java,
NodeMessagingClient.ProcessedMessage::class.java,
NodeMessagingClient.RetryMessage::class.java
))
// Required schemas are those used by internal Corda services

View File

@ -5,11 +5,11 @@ import java.util.*
/**
* Implements a caching layer on top of an *append-only* table accessed via Hibernate mapping. Note that if the same key is [put] twice the
* Implements a caching layer on top of an *append-only* table accessed via Hibernate mapping. Note that if the same key is [set] twice the
* behaviour is unpredictable! There is a best-effort check for double inserts, but this should *not* be relied on, so
* ONLY USE THIS IF YOUR TABLE IS APPEND-ONLY
*/
class AppendOnlyPersistentMap<K, V, E, EK> (
class AppendOnlyPersistentMap<K, V, E, out EK> (
val toPersistentEntityKey: (K) -> EK,
val fromPersistentEntity: (E) -> Pair<K,V>,
val toPersistentEntity: (key: K, value: V) -> E,
@ -84,7 +84,7 @@ class AppendOnlyPersistentMap<K, V, E, EK> (
*/
operator fun set(key: K, value: V) =
set(key, value, logWarning = false) {
key,value -> DatabaseTransactionManager.current().session.save(toPersistentEntity(key,value))
k, v -> DatabaseTransactionManager.current().session.save(toPersistentEntity(k, v))
null
}
@ -95,10 +95,10 @@ class AppendOnlyPersistentMap<K, V, E, EK> (
*/
fun addWithDuplicatesAllowed(key: K, value: V): Boolean =
set(key, value) {
key, value ->
val existingEntry = DatabaseTransactionManager.current().session.find(persistentEntityClass, toPersistentEntityKey(key))
k, v ->
val existingEntry = DatabaseTransactionManager.current().session.find(persistentEntityClass, toPersistentEntityKey(k))
if (existingEntry == null) {
DatabaseTransactionManager.current().session.save(toPersistentEntity(key,value))
DatabaseTransactionManager.current().session.save(toPersistentEntity(k, v))
null
} else {
fromPersistentEntity(existingEntry).second
@ -110,4 +110,5 @@ class AppendOnlyPersistentMap<K, V, E, EK> (
return result?.let(fromPersistentEntity)?.second
}
operator fun contains(key: K) = get(key) != null
}

View File

@ -11,7 +11,7 @@ import java.util.*
/**
* Implements an unbound caching layer on top of a table accessed via Hibernate mapping.
*/
class PersistentMap<K, V, E, EK> (
class PersistentMap<K, V, E, out EK> (
val toPersistentEntityKey: (K) -> EK,
val fromPersistentEntity: (E) -> Pair<K,V>,
val toPersistentEntity: (key: K, value: V) -> E,
@ -47,7 +47,7 @@ class PersistentMap<K, V, E, EK> (
RemovalCause.EXPIRED, RemovalCause.SIZE, RemovalCause.COLLECTED -> {
log.error("Entry was removed from cache!!!")
}
//else do nothing for RemovalCause.REPLACED
RemovalCause.REPLACED -> {}
}
}
}
@ -57,7 +57,7 @@ class PersistentMap<K, V, E, EK> (
}
fun all(): Sequence<Pair<K, V>> {
return cache.asMap().map { entry -> Pair(entry.key as K, entry.value.get()) }.asSequence()
return cache.asMap().asSequence().map { Pair(it.key, it.value.get()) }
}
override val size = cache.size().toInt()
@ -98,8 +98,8 @@ class PersistentMap<K, V, E, EK> (
operator fun set(key: K, value: V) =
set(key, value,
logWarning = false,
store = { key: K, value: V ->
DatabaseTransactionManager.current().session.save(toPersistentEntity(key,value))
store = { k: K, v: V ->
DatabaseTransactionManager.current().session.save(toPersistentEntity(k, v))
null
},
replace = { _: K, _: V -> Unit }
@ -112,10 +112,10 @@ class PersistentMap<K, V, E, EK> (
*/
fun addWithDuplicatesAllowed(key: K, value: V) =
set(key, value,
store = { key, value ->
val existingEntry = DatabaseTransactionManager.current().session.find(persistentEntityClass, toPersistentEntityKey(key))
store = { k, v ->
val existingEntry = DatabaseTransactionManager.current().session.find(persistentEntityClass, toPersistentEntityKey(k))
if (existingEntry == null) {
DatabaseTransactionManager.current().session.save(toPersistentEntity(key, value))
DatabaseTransactionManager.current().session.save(toPersistentEntity(k, v))
null
} else {
fromPersistentEntity(existingEntry).second
@ -128,7 +128,7 @@ class PersistentMap<K, V, E, EK> (
* Associates the specified value with the specified key in this map and persists it.
* @return true if added key was unique, otherwise false
*/
fun addWithDuplicatesReplaced(key: K, value: V) =
private fun addWithDuplicatesReplaced(key: K, value: V) =
set(key, value,
logWarning = false,
store = { k: K, v: V -> merge(k, v) },
@ -249,4 +249,11 @@ class PersistentMap<K, V, E, EK> (
addWithDuplicatesReplaced(key, value)
return old.orElse(null)
}
fun load() {
val session = DatabaseTransactionManager.current().session
val criteriaQuery = session.criteriaBuilder.createQuery(persistentEntityClass)
criteriaQuery.select(criteriaQuery.from(persistentEntityClass))
cache.getAll(session.createQuery(criteriaQuery).resultList.map { e -> fromPersistentEntity(e as E).first }.asIterable())
}
}