mirror of
https://github.com/corda/corda.git
synced 2025-05-02 08:43:15 +00:00
Moved ThreadBox into core.internal
This commit is contained in:
parent
08d2e351d2
commit
a485bbada8
@ -10,7 +10,7 @@ import com.google.common.cache.RemovalCause
|
|||||||
import com.google.common.cache.RemovalListener
|
import com.google.common.cache.RemovalListener
|
||||||
import com.google.common.util.concurrent.SettableFuture
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
import net.corda.core.getOrThrow
|
import net.corda.core.getOrThrow
|
||||||
import net.corda.core.internal.LazyPool
|
import net.corda.core.internal.LazyPool
|
||||||
|
@ -25,12 +25,10 @@ import java.util.concurrent.CompletableFuture
|
|||||||
import java.util.concurrent.ExecutionException
|
import java.util.concurrent.ExecutionException
|
||||||
import java.util.concurrent.Future
|
import java.util.concurrent.Future
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
|
||||||
import java.util.zip.Deflater
|
import java.util.zip.Deflater
|
||||||
import java.util.zip.ZipEntry
|
import java.util.zip.ZipEntry
|
||||||
import java.util.zip.ZipInputStream
|
import java.util.zip.ZipInputStream
|
||||||
import java.util.zip.ZipOutputStream
|
import java.util.zip.ZipOutputStream
|
||||||
import kotlin.concurrent.withLock
|
|
||||||
|
|
||||||
// TODO: Review by EOY2016 if we ever found these utilities helpful.
|
// TODO: Review by EOY2016 if we ever found these utilities helpful.
|
||||||
val Int.bd: BigDecimal get() = BigDecimal(this)
|
val Int.bd: BigDecimal get() = BigDecimal(this)
|
||||||
@ -163,33 +161,6 @@ fun <T> logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T
|
|||||||
|
|
||||||
fun <T> Logger.logElapsedTime(label: String, body: () -> T): T = logElapsedTime(label, this, body)
|
fun <T> Logger.logElapsedTime(label: String, body: () -> T): T = logElapsedTime(label, this, body)
|
||||||
|
|
||||||
/**
|
|
||||||
* A threadbox is a simple utility that makes it harder to forget to take a lock before accessing some shared state.
|
|
||||||
* Simply define a private class to hold the data that must be grouped under the same lock, and then pass the only
|
|
||||||
* instance to the ThreadBox constructor. You can now use the [locked] method with a lambda to take the lock in a
|
|
||||||
* way that ensures it'll be released if there's an exception.
|
|
||||||
*
|
|
||||||
* Note that this technique is not infallible: if you capture a reference to the fields in another lambda which then
|
|
||||||
* gets stored and invoked later, there may still be unsafe multi-threaded access going on, so watch out for that.
|
|
||||||
* This is just a simple guard rail that makes it harder to slip up.
|
|
||||||
*
|
|
||||||
* Example:
|
|
||||||
*
|
|
||||||
* private class MutableState { var i = 5 }
|
|
||||||
* private val state = ThreadBox(MutableState())
|
|
||||||
*
|
|
||||||
* val ii = state.locked { i }
|
|
||||||
*/
|
|
||||||
class ThreadBox<out T>(val content: T, val lock: ReentrantLock = ReentrantLock()) {
|
|
||||||
inline fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
|
|
||||||
inline fun <R> alreadyLocked(body: T.() -> R): R {
|
|
||||||
check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." })
|
|
||||||
return body(content)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun checkNotLocked() = check(!lock.isHeldByCurrentThread)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a path to a zip file, extracts it to the given directory.
|
* Given a path to a zip file, extracts it to the given directory.
|
||||||
*/
|
*/
|
||||||
|
32
core/src/main/kotlin/net/corda/core/internal/ThreadBox.kt
Normal file
32
core/src/main/kotlin/net/corda/core/internal/ThreadBox.kt
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
package net.corda.core.internal
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
|
import kotlin.concurrent.withLock
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A threadbox is a simple utility that makes it harder to forget to take a lock before accessing some shared state.
|
||||||
|
* Simply define a private class to hold the data that must be grouped under the same lock, and then pass the only
|
||||||
|
* instance to the ThreadBox constructor. You can now use the [locked] method with a lambda to take the lock in a
|
||||||
|
* way that ensures it'll be released if there's an exception.
|
||||||
|
*
|
||||||
|
* Note that this technique is not infallible: if you capture a reference to the fields in another lambda which then
|
||||||
|
* gets stored and invoked later, there may still be unsafe multi-threaded access going on, so watch out for that.
|
||||||
|
* This is just a simple guard rail that makes it harder to slip up.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
*```
|
||||||
|
* private class MutableState { var i = 5 }
|
||||||
|
* private val state = ThreadBox(MutableState())
|
||||||
|
*
|
||||||
|
* val ii = state.locked { i }
|
||||||
|
* ```
|
||||||
|
*/
|
||||||
|
class ThreadBox<out T>(val content: T, val lock: ReentrantLock = ReentrantLock()) {
|
||||||
|
inline fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
|
||||||
|
inline fun <R> alreadyLocked(body: T.() -> R): R {
|
||||||
|
check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." })
|
||||||
|
return body(content)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun checkNotLocked(): Unit = check(!lock.isHeldByCurrentThread)
|
||||||
|
}
|
@ -1,7 +1,7 @@
|
|||||||
package net.corda.node.services.events
|
package net.corda.node.services.events
|
||||||
|
|
||||||
import com.google.common.util.concurrent.SettableFuture
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.contracts.SchedulableState
|
import net.corda.core.contracts.SchedulableState
|
||||||
import net.corda.core.contracts.ScheduledActivity
|
import net.corda.core.contracts.ScheduledActivity
|
||||||
import net.corda.core.contracts.ScheduledStateRef
|
import net.corda.core.contracts.ScheduledStateRef
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.node.services.keys
|
package net.corda.node.services.keys
|
||||||
|
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.crypto.DigitalSignature
|
import net.corda.core.crypto.DigitalSignature
|
||||||
import net.corda.core.crypto.generateKeyPair
|
import net.corda.core.crypto.generateKeyPair
|
||||||
import net.corda.core.crypto.keys
|
import net.corda.core.crypto.keys
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.node.services.keys
|
package net.corda.node.services.keys
|
||||||
|
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.crypto.DigitalSignature
|
import net.corda.core.crypto.DigitalSignature
|
||||||
import net.corda.core.crypto.generateKeyPair
|
import net.corda.core.crypto.generateKeyPair
|
||||||
import net.corda.core.crypto.keys
|
import net.corda.core.crypto.keys
|
||||||
|
@ -7,6 +7,7 @@ import net.corda.core.*
|
|||||||
import net.corda.core.crypto.*
|
import net.corda.core.crypto.*
|
||||||
import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_TLS
|
import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_TLS
|
||||||
import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
|
import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
|
||||||
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.node.NodeInfo
|
import net.corda.core.node.NodeInfo
|
||||||
import net.corda.core.node.services.NetworkMapCache
|
import net.corda.core.node.services.NetworkMapCache
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.node.services.messaging
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
import net.corda.core.ThreadBox
|
|
||||||
import net.corda.core.andForget
|
import net.corda.core.andForget
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
import net.corda.core.getOrThrow
|
import net.corda.core.getOrThrow
|
||||||
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
import net.corda.core.messaging.CordaRPCOps
|
||||||
import net.corda.core.messaging.MessageRecipients
|
import net.corda.core.messaging.MessageRecipients
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
@ -468,8 +468,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun sendWithRetry(retryCount: Int, address: String, message: ClientMessage, retryId: Long) {
|
private fun sendWithRetry(retryCount: Int, address: String, message: ClientMessage, retryId: Long) {
|
||||||
fun randomiseDuplicateId(message: ClientMessage) {
|
fun ClientMessage.randomiseDuplicateId() {
|
||||||
message.putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.trace { "Attempting to retry #$retryCount message delivery for $retryId" }
|
log.trace { "Attempting to retry #$retryCount message delivery for $retryId" }
|
||||||
@ -479,7 +479,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
randomiseDuplicateId(message)
|
message.randomiseDuplicateId()
|
||||||
|
|
||||||
state.locked {
|
state.locked {
|
||||||
log.trace { "Retry #$retryCount sending message $message to $address for $retryId" }
|
log.trace { "Retry #$retryCount sending message $message to $address for $retryId" }
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package net.corda.node.services.network
|
package net.corda.node.services.network
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting
|
import com.google.common.annotations.VisibleForTesting
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.crypto.DigitalSignature
|
import net.corda.core.crypto.DigitalSignature
|
||||||
import net.corda.core.crypto.SignedData
|
import net.corda.core.crypto.SignedData
|
||||||
import net.corda.core.crypto.isFulfilledBy
|
import net.corda.core.crypto.isFulfilledBy
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.node.services.network
|
package net.corda.node.services.network
|
||||||
|
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.identity.PartyAndCertificate
|
import net.corda.core.identity.PartyAndCertificate
|
||||||
import net.corda.core.messaging.SingleMessageRecipient
|
import net.corda.core.messaging.SingleMessageRecipient
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.node.services.persistence
|
package net.corda.node.services.persistence
|
||||||
|
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.bufferUntilSubscribed
|
import net.corda.core.bufferUntilSubscribed
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.node.services.persistence
|
package net.corda.node.services.persistence
|
||||||
|
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.bufferUntilSubscribed
|
import net.corda.core.bufferUntilSubscribed
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
|
@ -8,7 +8,7 @@ import com.esotericsoftware.kryo.KryoException
|
|||||||
import com.google.common.collect.HashMultimap
|
import com.google.common.collect.HashMultimap
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
import com.google.common.util.concurrent.MoreExecutors
|
import com.google.common.util.concurrent.MoreExecutors
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.bufferUntilSubscribed
|
import net.corda.core.bufferUntilSubscribed
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.node.services.transactions
|
package net.corda.node.services.transactions
|
||||||
|
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.contracts.StateRef
|
import net.corda.core.contracts.StateRef
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.node.services.vault
|
package net.corda.node.services.vault
|
||||||
|
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.bufferUntilSubscribed
|
import net.corda.core.bufferUntilSubscribed
|
||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
import net.corda.core.contracts.StateAndRef
|
import net.corda.core.contracts.StateAndRef
|
||||||
|
@ -12,7 +12,7 @@ import io.requery.kotlin.notNull
|
|||||||
import io.requery.query.RowExpression
|
import io.requery.query.RowExpression
|
||||||
import net.corda.contracts.asset.Cash
|
import net.corda.contracts.asset.Cash
|
||||||
import net.corda.contracts.asset.OnLedgerAsset
|
import net.corda.contracts.asset.OnLedgerAsset
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.bufferUntilSubscribed
|
import net.corda.core.bufferUntilSubscribed
|
||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
|
@ -8,7 +8,7 @@ import net.corda.contracts.Tenor
|
|||||||
import net.corda.contracts.math.CubicSplineInterpolator
|
import net.corda.contracts.math.CubicSplineInterpolator
|
||||||
import net.corda.contracts.math.Interpolator
|
import net.corda.contracts.math.Interpolator
|
||||||
import net.corda.contracts.math.InterpolatorFactory
|
import net.corda.contracts.math.InterpolatorFactory
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.contracts.Command
|
import net.corda.core.contracts.Command
|
||||||
import net.corda.core.crypto.DigitalSignature
|
import net.corda.core.crypto.DigitalSignature
|
||||||
import net.corda.core.crypto.MerkleTreeException
|
import net.corda.core.crypto.MerkleTreeException
|
||||||
|
@ -15,6 +15,7 @@ import net.corda.core.crypto.X509Utilities
|
|||||||
import net.corda.core.crypto.appendToCommonName
|
import net.corda.core.crypto.appendToCommonName
|
||||||
import net.corda.core.crypto.commonName
|
import net.corda.core.crypto.commonName
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.internal.times
|
import net.corda.core.internal.times
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
import net.corda.core.messaging.CordaRPCOps
|
||||||
|
@ -3,7 +3,7 @@ package net.corda.testing.node
|
|||||||
import com.google.common.util.concurrent.Futures
|
import com.google.common.util.concurrent.Futures
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
import com.google.common.util.concurrent.SettableFuture
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.crypto.X509Utilities
|
import net.corda.core.crypto.X509Utilities
|
||||||
import net.corda.core.getOrThrow
|
import net.corda.core.getOrThrow
|
||||||
import net.corda.core.messaging.AllPossibleRecipients
|
import net.corda.core.messaging.AllPossibleRecipients
|
||||||
|
Loading…
x
Reference in New Issue
Block a user