mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
Revert Party.Full in preference for a less invasive change
This commit is contained in:
@ -30,7 +30,7 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
lateinit var alice: NodeHandle
|
||||
lateinit var notaries: List<NodeHandle>
|
||||
lateinit var aliceProxy: CordaRPCOps
|
||||
lateinit var raftNotaryIdentity: Party.Full
|
||||
lateinit var raftNotaryIdentity: Party
|
||||
lateinit var notaryStateMachines: Observable<Pair<NodeInfo, StateMachineUpdate>>
|
||||
|
||||
override fun setup() = driver {
|
||||
@ -80,7 +80,7 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
}
|
||||
|
||||
// The state machines added in the notaries should map one-to-one to notarisation requests
|
||||
val notarisationsPerNotary = HashMap<Party.Full, Int>()
|
||||
val notarisationsPerNotary = HashMap<Party, Int>()
|
||||
notaryStateMachines.expectEvents(isStrict = false) {
|
||||
replicate<Pair<NodeInfo, StateMachineUpdate>>(50) {
|
||||
expect(match = { it.second is StateMachineUpdate.Added }) {
|
||||
@ -119,7 +119,7 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
paySelf(5.POUNDS)
|
||||
}
|
||||
|
||||
val notarisationsPerNotary = HashMap<Party.Full, Int>()
|
||||
val notarisationsPerNotary = HashMap<Party, Int>()
|
||||
notaryStateMachines.expectEvents(isStrict = false) {
|
||||
replicate<Pair<NodeInfo, StateMachineUpdate>>(30) {
|
||||
expect(match = { it.second is StateMachineUpdate.Added }) {
|
||||
|
@ -56,7 +56,7 @@ class RaftNotaryServiceTests : NodeBasedTest() {
|
||||
assertEquals(error.tx, secondSpendTx.tx)
|
||||
}
|
||||
|
||||
private fun issueState(node: AbstractNode, notary: Party.Full, notaryKey: KeyPair): StateAndRef<*> {
|
||||
private fun issueState(node: AbstractNode, notary: Party, notaryKey: KeyPair): StateAndRef<*> {
|
||||
return databaseTransaction(node.database) {
|
||||
val tx = DummyContract.generateInitial(node.info.legalIdentity.ref(0), Random().nextInt(), notary)
|
||||
tx.signWith(node.services.legalIdentityKey)
|
||||
|
@ -226,7 +226,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
.withMessageContaining(permission)
|
||||
}
|
||||
|
||||
private fun startBobAndCommunicateWithAlice(): Party.Full {
|
||||
private fun startBobAndCommunicateWithAlice(): Party {
|
||||
val bob = startNode("Bob").getOrThrow()
|
||||
bob.services.registerFlowInitiator(SendFlow::class, ::ReceiveFlow)
|
||||
val bobParty = bob.info.legalIdentity
|
||||
@ -235,12 +235,12 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
return bobParty
|
||||
}
|
||||
|
||||
private class SendFlow(val otherParty: Party.Full, val payload: Any) : FlowLogic<Unit>() {
|
||||
private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, payload)
|
||||
}
|
||||
|
||||
private class ReceiveFlow(val otherParty: Party.Full) : FlowLogic<Any>() {
|
||||
private class ReceiveFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call() = receive<Any>(otherParty).unwrap { it }
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
private fun SimpleNode.registerWithNetworkMap(registrationName: String): ListenableFuture<NetworkMapService.RegistrationResponse> {
|
||||
val nodeInfo = NodeInfo(net.myAddress, Party.Full(registrationName, identity.public))
|
||||
val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public))
|
||||
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
|
||||
val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress)
|
||||
return net.sendRequest<NetworkMapService.RegistrationResponse>(REGISTER_FLOW_TOPIC, request, networkMapNode.net.myAddress)
|
||||
|
@ -58,7 +58,7 @@ interface DriverDSLExposedInterface {
|
||||
/**
|
||||
* Starts a [Node] in a separate process.
|
||||
*
|
||||
* @param providedName Optional name of the node, which will be its legal name in [Party.Full]. Defaults to something
|
||||
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
|
||||
* random. Note that this must be unique as the driver uses it as a primary key!
|
||||
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
|
||||
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
|
||||
@ -76,13 +76,13 @@ interface DriverDSLExposedInterface {
|
||||
* @param clusterSize Number of nodes to create for the cluster.
|
||||
* @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type].
|
||||
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
|
||||
* @return The [Party.Full] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster.
|
||||
* @return The [Party] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster.
|
||||
*/
|
||||
fun startNotaryCluster(
|
||||
notaryName: String,
|
||||
clusterSize: Int = 3,
|
||||
type: ServiceType = RaftValidatingNotaryService.type,
|
||||
rpcUsers: List<User> = emptyList()): Future<Pair<Party.Full, List<NodeHandle>>>
|
||||
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
|
||||
|
||||
/**
|
||||
* Starts a web server for a node
|
||||
@ -392,7 +392,7 @@ open class DriverDSL(
|
||||
clusterSize: Int,
|
||||
type: ServiceType,
|
||||
rpcUsers: List<User>
|
||||
): ListenableFuture<Pair<Party.Full, List<NodeHandle>>> {
|
||||
): ListenableFuture<Pair<Party, List<NodeHandle>>> {
|
||||
val nodeNames = (1..clusterSize).map { "Notary Node $it" }
|
||||
val paths = nodeNames.map { driverDirectory / it }
|
||||
ServiceIdentityGenerator.generateToDisk(paths, type.id, notaryName)
|
||||
|
@ -104,7 +104,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
// low-performance prototyping period.
|
||||
protected abstract val serverThread: AffinityExecutor
|
||||
|
||||
private val flowFactories = ConcurrentHashMap<Class<*>, (Party.Full) -> FlowLogic<*>>()
|
||||
private val flowFactories = ConcurrentHashMap<Class<*>, (Party) -> FlowLogic<*>>()
|
||||
protected val partyKeys = mutableSetOf<KeyPair>()
|
||||
|
||||
val services = object : ServiceHubInternal() {
|
||||
@ -127,13 +127,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return serverThread.fetchFrom { smm.add(logic) }
|
||||
}
|
||||
|
||||
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party.Full) -> FlowLogic<*>) {
|
||||
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
|
||||
require(markerClass !in flowFactories) { "${markerClass.java.name} has already been used to register a flow" }
|
||||
log.info("Registering flow ${markerClass.java.name}")
|
||||
flowFactories[markerClass.java] = flowFactory
|
||||
}
|
||||
|
||||
override fun getFlowFactory(markerClass: Class<*>): ((Party.Full) -> FlowLogic<*>)? {
|
||||
override fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? {
|
||||
return flowFactories[markerClass]
|
||||
}
|
||||
|
||||
@ -491,10 +491,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage) =
|
||||
StorageServiceImpl(attachments, transactionStorage, stateMachineRecordedTransactionMappingStorage)
|
||||
|
||||
protected fun obtainLegalIdentity(): Party.Full = obtainKeyPair(configuration.baseDirectory, PRIVATE_KEY_FILE_NAME, PUBLIC_IDENTITY_FILE_NAME).first
|
||||
protected fun obtainLegalIdentity(): Party = obtainKeyPair(configuration.baseDirectory, PRIVATE_KEY_FILE_NAME, PUBLIC_IDENTITY_FILE_NAME).first
|
||||
protected fun obtainLegalIdentityKey(): KeyPair = obtainKeyPair(configuration.baseDirectory, PRIVATE_KEY_FILE_NAME, PUBLIC_IDENTITY_FILE_NAME).second
|
||||
|
||||
private fun obtainKeyPair(dir: Path, privateKeyFileName: String, publicKeyFileName: String, serviceName: String? = null): Pair<Party.Full, KeyPair> {
|
||||
private fun obtainKeyPair(dir: Path, privateKeyFileName: String, publicKeyFileName: String, serviceName: String? = null): Pair<Party, KeyPair> {
|
||||
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that
|
||||
// is distributed to other peers and we use it (or a key signed by it) when we need to do something
|
||||
// "permissioned". The identity file is what gets distributed and contains the node's legal name along with
|
||||
@ -508,7 +508,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
log.info("Identity key not found, generating fresh key!")
|
||||
val keyPair: KeyPair = generateKeyPair()
|
||||
keyPair.serialize().writeToFile(privKeyFile)
|
||||
val myIdentity = Party.Full(identityName, keyPair.public)
|
||||
val myIdentity = Party(identityName, keyPair.public)
|
||||
// We include the Party class with the file here to help catch mixups when admins provide files of the
|
||||
// wrong type by mistake.
|
||||
myIdentity.serialize().writeToFile(pubIdentityFile)
|
||||
@ -517,7 +517,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
// Check that the identity in the config file matches the identity file we have stored to disk.
|
||||
// This is just a sanity check. It shouldn't fail unless the admin has fiddled with the files and messed
|
||||
// things up for us.
|
||||
val myIdentity = pubIdentityFile.readAll().deserialize<Party.Full>()
|
||||
val myIdentity = pubIdentityFile.readAll().deserialize<Party>()
|
||||
if (myIdentity.name != identityName)
|
||||
throw ConfigurationException("The legal name in the config file doesn't match the stored identity file:" +
|
||||
"$identityName vs ${myIdentity.name}")
|
||||
|
@ -13,17 +13,17 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
*/
|
||||
@ThreadSafe
|
||||
class InMemoryIdentityService() : SingletonSerializeAsToken(), IdentityService {
|
||||
private val keyToParties = ConcurrentHashMap<CompositeKey, Party.Full>()
|
||||
private val nameToParties = ConcurrentHashMap<String, Party.Full>()
|
||||
private val keyToParties = ConcurrentHashMap<CompositeKey, Party>()
|
||||
private val nameToParties = ConcurrentHashMap<String, Party>()
|
||||
|
||||
override fun registerIdentity(party: Party.Full) {
|
||||
override fun registerIdentity(party: Party) {
|
||||
keyToParties[party.owningKey] = party
|
||||
nameToParties[party.name] = party
|
||||
}
|
||||
|
||||
// We give the caller a copy of the data set to avoid any locking problems
|
||||
override fun getAllIdentities(): Iterable<Party.Full> = ArrayList(keyToParties.values)
|
||||
override fun getAllIdentities(): Iterable<Party> = ArrayList(keyToParties.values)
|
||||
|
||||
override fun partyFromKey(key: CompositeKey): Party.Full? = keyToParties[key]
|
||||
override fun partyFromName(name: String): Party.Full? = nameToParties[name]
|
||||
override fun partyFromKey(key: CompositeKey): Party? = keyToParties[key]
|
||||
override fun partyFromName(name: String): Party? = nameToParties[name]
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
|
||||
register(SignedTransaction::class.java, ImmutableClassSerializer(SignedTransaction::class))
|
||||
register(WireTransaction::class.java, WireTransactionSerializer)
|
||||
register(SerializedBytes::class.java, SerializedBytesSerializer)
|
||||
register(Party.Full::class.java)
|
||||
register(Party::class.java)
|
||||
register(Array<Any>(0,{}).javaClass)
|
||||
register(Class::class.java, ClassSerializer)
|
||||
|
||||
|
@ -75,7 +75,7 @@ interface NetworkMapService {
|
||||
|
||||
data class FetchMapResponse(val nodes: Collection<NodeRegistration>?, val version: Int)
|
||||
|
||||
class QueryIdentityRequest(val identity: Party.Full,
|
||||
class QueryIdentityRequest(val identity: Party,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long) : ServiceRequestMessage
|
||||
|
||||
@ -100,7 +100,7 @@ interface NetworkMapService {
|
||||
@ThreadSafe
|
||||
class InMemoryNetworkMapService(services: ServiceHubInternal) : AbstractNetworkMapService(services) {
|
||||
|
||||
override val registeredNodes: MutableMap<Party.Full, NodeRegistrationInfo> = ConcurrentHashMap()
|
||||
override val registeredNodes: MutableMap<Party, NodeRegistrationInfo> = ConcurrentHashMap()
|
||||
override val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, LastAcknowledgeInfo>())
|
||||
|
||||
init {
|
||||
@ -117,7 +117,7 @@ class InMemoryNetworkMapService(services: ServiceHubInternal) : AbstractNetworkM
|
||||
@ThreadSafe
|
||||
abstract class AbstractNetworkMapService
|
||||
(services: ServiceHubInternal) : NetworkMapService, AbstractNodeService(services) {
|
||||
protected abstract val registeredNodes: MutableMap<Party.Full, NodeRegistrationInfo>
|
||||
protected abstract val registeredNodes: MutableMap<Party, NodeRegistrationInfo>
|
||||
|
||||
// Map from subscriber address, to most recently acknowledged update map version.
|
||||
protected abstract val subscribers: ThreadBox<MutableMap<SingleMessageRecipient, LastAcknowledgeInfo>>
|
||||
@ -275,7 +275,7 @@ abstract class AbstractNetworkMapService
|
||||
// Update the current value atomically, so that if multiple updates come
|
||||
// in on different threads, there is no risk of a race condition while checking
|
||||
// sequence numbers.
|
||||
val registrationInfo = registeredNodes.compute(node.legalIdentity, { mapKey: Party.Full, existing: NodeRegistrationInfo? ->
|
||||
val registrationInfo = registeredNodes.compute(node.legalIdentity, { mapKey: Party, existing: NodeRegistrationInfo? ->
|
||||
changed = existing == null || existing.reg.serial < change.serial
|
||||
if (changed) {
|
||||
when (change.type) {
|
||||
|
@ -23,17 +23,17 @@ class PersistentNetworkMapService(services: ServiceHubInternal) : AbstractNetwor
|
||||
val registrationInfo = blob("node_registration_info")
|
||||
}
|
||||
|
||||
override val registeredNodes: MutableMap<Party.Full, NodeRegistrationInfo> = synchronizedMap(object : AbstractJDBCHashMap<Party.Full, NodeRegistrationInfo, Table>(Table, loadOnInit = true) {
|
||||
override fun keyFromRow(row: ResultRow): Party.Full = Party.Full(row[table.nodeParty.name], row[table.nodeParty.owningKey])
|
||||
override val registeredNodes: MutableMap<Party, NodeRegistrationInfo> = synchronizedMap(object : AbstractJDBCHashMap<Party, NodeRegistrationInfo, Table>(Table, loadOnInit = true) {
|
||||
override fun keyFromRow(row: ResultRow): Party = Party(row[table.nodeParty.name], row[table.nodeParty.owningKey])
|
||||
|
||||
override fun valueFromRow(row: ResultRow): NodeRegistrationInfo = deserializeFromBlob(row[table.registrationInfo])
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<Party.Full, NodeRegistrationInfo>, finalizables: MutableList<() -> Unit>) {
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<Party, NodeRegistrationInfo>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.nodeParty.name] = entry.key.name
|
||||
insert[table.nodeParty.owningKey] = entry.key.owningKey
|
||||
}
|
||||
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<Party.Full, NodeRegistrationInfo>, finalizables: MutableList<() -> Unit>) {
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<Party, NodeRegistrationInfo>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.registrationInfo] = serializeToBlob(entry.value, finalizables)
|
||||
}
|
||||
})
|
||||
|
@ -40,20 +40,20 @@ object DataVending {
|
||||
services.registerFlowInitiator(BroadcastTransactionFlow::class, ::NotifyTransactionHandler)
|
||||
}
|
||||
|
||||
private class FetchTransactionsHandler(otherParty: Party.Full) : FetchDataHandler<SignedTransaction>(otherParty) {
|
||||
private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
|
||||
override fun getData(id: SecureHash): SignedTransaction? {
|
||||
return serviceHub.storageService.validatedTransactions.getTransaction(id)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
|
||||
private class FetchAttachmentsHandler(otherParty: Party.Full) : FetchDataHandler<ByteArray>(otherParty) {
|
||||
private class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
|
||||
override fun getData(id: SecureHash): ByteArray? {
|
||||
return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
|
||||
}
|
||||
}
|
||||
|
||||
private abstract class FetchDataHandler<out T>(val otherParty: Party.Full) : FlowLogic<Unit>() {
|
||||
private abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
@Throws(FetchDataFlow.HashNotFound::class)
|
||||
override fun call() {
|
||||
@ -75,7 +75,7 @@ object DataVending {
|
||||
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
|
||||
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
|
||||
// cash without from unknown parties?
|
||||
class NotifyTransactionHandler(val otherParty: Party.Full) : FlowLogic<Unit>() {
|
||||
class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
|
||||
|
@ -72,7 +72,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
}
|
||||
|
||||
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party.Full>, FlowSession>()
|
||||
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSession>()
|
||||
|
||||
init {
|
||||
logic.stateMachine = this
|
||||
@ -127,7 +127,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
|
||||
@Suspendable
|
||||
override fun <T : Any> sendAndReceive(receiveType: Class<T>,
|
||||
otherParty: Party.Full,
|
||||
otherParty: Party,
|
||||
payload: Any,
|
||||
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
|
||||
val session = getConfirmedSession(otherParty, sessionFlow)
|
||||
@ -141,14 +141,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
|
||||
@Suspendable
|
||||
override fun <T : Any> receive(receiveType: Class<T>,
|
||||
otherParty: Party.Full,
|
||||
otherParty: Party,
|
||||
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
|
||||
val session = getConfirmedSession(otherParty, sessionFlow) ?: startNewSession(otherParty, sessionFlow, null, waitForConfirmation = true)
|
||||
return receiveInternal<SessionData>(session).checkPayloadIs(receiveType)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun send(otherParty: Party.Full, payload: Any, sessionFlow: FlowLogic<*>) {
|
||||
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
|
||||
val session = getConfirmedSession(otherParty, sessionFlow)
|
||||
if (session == null) {
|
||||
// Don't send the payload again if it was already piggy-backed on a session init
|
||||
@ -197,7 +197,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun getConfirmedSession(otherParty: Party.Full, sessionFlow: FlowLogic<*>): FlowSession? {
|
||||
private fun getConfirmedSession(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession? {
|
||||
return openSessions[Pair(sessionFlow, otherParty)]?.apply {
|
||||
if (state is FlowSessionState.Initiating) {
|
||||
// Session still initiating, try to retrieve the init response.
|
||||
@ -213,7 +213,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
* multiple public keys, but we **don't support multiple nodes advertising the same legal identity**.
|
||||
*/
|
||||
@Suspendable
|
||||
private fun startNewSession(otherParty: Party.Full, sessionFlow: FlowLogic<*>, firstPayload: Any?, waitForConfirmation: Boolean): FlowSession {
|
||||
private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?, waitForConfirmation: Boolean): FlowSession {
|
||||
logger.trace { "Initiating a new session with $otherParty" }
|
||||
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty))
|
||||
openSessions[Pair(sessionFlow, otherParty)] = session
|
||||
|
@ -31,7 +31,7 @@ data class SessionData(override val recipientSessionId: Long, val payload: Any)
|
||||
|
||||
data class SessionEnd(override val recipientSessionId: Long, val errorResponse: FlowException?) : ExistingSessionMessage
|
||||
|
||||
data class ReceivedSessionMessage<out M : ExistingSessionMessage>(val sender: Party.Full, val message: M)
|
||||
data class ReceivedSessionMessage<out M : ExistingSessionMessage>(val sender: Party, val message: M)
|
||||
|
||||
fun <T> ReceivedSessionMessage<SessionData>.checkPayloadIs(type: Class<T>): UntrustworthyData<T> {
|
||||
if (type.isInstance(message.payload)) {
|
||||
|
@ -117,7 +117,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
private val totalFinishedFlows = metrics.counter("Flows.Finished")
|
||||
|
||||
private val openSessions = ConcurrentHashMap<Long, FlowSession>()
|
||||
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party.Full>()
|
||||
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
|
||||
|
||||
// Context for tokenized services in checkpoints
|
||||
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo())
|
||||
@ -238,7 +238,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
}
|
||||
}
|
||||
|
||||
private fun onExistingSessionMessage(message: ExistingSessionMessage, sender: Party.Full) {
|
||||
private fun onExistingSessionMessage(message: ExistingSessionMessage, sender: Party) {
|
||||
val session = openSessions[message.recipientSessionId]
|
||||
if (session != null) {
|
||||
session.fiber.logger.trace { "Received $message on $session" }
|
||||
@ -267,7 +267,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
}
|
||||
}
|
||||
|
||||
private fun onSessionInit(sessionInit: SessionInit, sender: Party.Full) {
|
||||
private fun onSessionInit(sessionInit: SessionInit, sender: Party) {
|
||||
logger.trace { "Received $sessionInit $sender" }
|
||||
val otherPartySessionId = sessionInit.initiatorSessionId
|
||||
|
||||
@ -481,7 +481,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendSessionMessage(party: Party.Full, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null) {
|
||||
private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null) {
|
||||
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party)
|
||||
?: throw IllegalArgumentException("Don't know about party $party")
|
||||
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
|
||||
@ -493,24 +493,23 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
/**
|
||||
* [FlowSessionState] describes the session's state.
|
||||
*
|
||||
* [Initiating] is pre-handshake. [Initiating.otherParty] at this point holds a [Party.Full] corresponding to either a
|
||||
* [Initiating] is pre-handshake. [Initiating.otherParty] at this point holds a [Party] corresponding to either a
|
||||
* specific peer or a service.
|
||||
* [Initiated] is post-handshake. At this point [Initiating.otherParty] will have been resolved to a specific peer
|
||||
* [Initiated.peerParty], and the peer's sessionId has been initialised.
|
||||
*/
|
||||
sealed class FlowSessionState {
|
||||
abstract val sendToParty: Party.Full
|
||||
abstract val sendToParty: Party
|
||||
class Initiating(
|
||||
val otherParty: Party.Full
|
||||
/** This may be a specific peer or a service party */
|
||||
val otherParty: Party /** This may be a specific peer or a service party */
|
||||
) : FlowSessionState() {
|
||||
override val sendToParty: Party.Full get() = otherParty
|
||||
override val sendToParty: Party get() = otherParty
|
||||
}
|
||||
class Initiated(
|
||||
val peerParty: Party.Full,/** This must be a peer party */
|
||||
val peerParty: Party, /** This must be a peer party */
|
||||
val peerSessionId: Long
|
||||
) : FlowSessionState() {
|
||||
override val sendToParty: Party.Full get() = peerParty
|
||||
override val sendToParty: Party get() = peerParty
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,7 @@ class InMemoryUniquenessProvider() : UniquenessProvider {
|
||||
/** For each input state store the consuming transaction information */
|
||||
private val committedStates = ThreadBox(HashMap<StateRef, UniquenessProvider.ConsumingTx>())
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party.Full) {
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
committedStates.locked {
|
||||
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
for (inputState in states) {
|
||||
|
@ -21,6 +21,6 @@ abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeA
|
||||
}
|
||||
|
||||
/** Implement a factory that specifies the transaction commit flow for the notary service to use */
|
||||
abstract fun createFlow(otherParty: Party.Full): NotaryFlow.Service
|
||||
abstract fun createFlow(otherParty: Party): NotaryFlow.Service
|
||||
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ class PersistentUniquenessProvider() : UniquenessProvider, SingletonSerializeAsT
|
||||
override fun valueFromRow(row: ResultRow): UniquenessProvider.ConsumingTx = UniquenessProvider.ConsumingTx(
|
||||
row[table.consumingTxHash],
|
||||
row[table.consumingIndex],
|
||||
Party.Full(row[table.requestingParty.name], row[table.requestingParty.owningKey])
|
||||
Party(row[table.requestingParty.name], row[table.requestingParty.owningKey])
|
||||
)
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement,
|
||||
@ -58,7 +58,7 @@ class PersistentUniquenessProvider() : UniquenessProvider, SingletonSerializeAsT
|
||||
}
|
||||
})
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party.Full) {
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
val conflict = committedStates.locked {
|
||||
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
for (inputState in states) {
|
||||
|
@ -107,7 +107,7 @@ class RaftUniquenessProvider(storagePath: Path, myAddress: HostAndPort, clusterA
|
||||
.build()
|
||||
}
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party.Full) {
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
val entries = states.mapIndexed { i, stateRef -> stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity) }
|
||||
|
||||
log.debug("Attempting to commit input states: ${states.joinToString()}")
|
||||
|
@ -13,7 +13,7 @@ class RaftValidatingNotaryService(services: ServiceHubInternal,
|
||||
val type = ValidatingNotaryService.type.getSubType("raft")
|
||||
}
|
||||
|
||||
override fun createFlow(otherParty: Party.Full): ValidatingNotaryFlow {
|
||||
override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
|
||||
return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ class SimpleNotaryService(services: ServiceHubInternal,
|
||||
val type = ServiceType.notary.getSubType("simple")
|
||||
}
|
||||
|
||||
override fun createFlow(otherParty: Party.Full): NotaryFlow.Service {
|
||||
override fun createFlow(otherParty: Party): NotaryFlow.Service {
|
||||
return NotaryFlow.Service(otherParty, timestampChecker, uniquenessProvider)
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ class ValidatingNotaryService(services: ServiceHubInternal,
|
||||
val type = ServiceType.notary.getSubType("validating")
|
||||
}
|
||||
|
||||
override fun createFlow(otherParty: Party.Full): ValidatingNotaryFlow {
|
||||
override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
|
||||
return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
|
||||
}
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
override fun generateSpend(tx: TransactionBuilder,
|
||||
amount: Amount<Currency>,
|
||||
to: CompositeKey,
|
||||
onlyFromParties: Set<Party.Full>?): Pair<TransactionBuilder, List<CompositeKey>> {
|
||||
onlyFromParties: Set<Party>?): Pair<TransactionBuilder, List<CompositeKey>> {
|
||||
// Discussion
|
||||
//
|
||||
// This code is analogous to the Wallet.send() set of methods in bitcoinj, and has the same general outline.
|
||||
|
@ -29,11 +29,11 @@ import java.time.LocalDateTime
|
||||
*/
|
||||
object JsonSupport {
|
||||
interface PartyObjectMapper {
|
||||
fun partyFromName(partyName: String): Party.Full?
|
||||
fun partyFromName(partyName: String): Party?
|
||||
}
|
||||
|
||||
class RpcObjectMapper(val rpc: CordaRPCOps) : PartyObjectMapper, ObjectMapper() {
|
||||
override fun partyFromName(partyName: String): Party.Full? = rpc.partyFromName(partyName)
|
||||
override fun partyFromName(partyName: String): Party? = rpc.partyFromName(partyName)
|
||||
}
|
||||
class IdentityObjectMapper(val identityService: IdentityService) : PartyObjectMapper, ObjectMapper(){
|
||||
override fun partyFromName(partyName: String) = identityService.partyFromName(partyName)
|
||||
@ -53,8 +53,8 @@ object JsonSupport {
|
||||
|
||||
val cordaModule: Module by lazy {
|
||||
SimpleModule("core").apply {
|
||||
addSerializer(Party.Full::class.java, PartySerializer)
|
||||
addDeserializer(Party.Full::class.java, PartyDeserializer)
|
||||
addSerializer(Party::class.java, PartySerializer)
|
||||
addDeserializer(Party::class.java, PartyDeserializer)
|
||||
addSerializer(BigDecimal::class.java, ToStringSerializer)
|
||||
addDeserializer(BigDecimal::class.java, NumberDeserializers.BigDecimalDeserializer())
|
||||
addSerializer(SecureHash::class.java, SecureHashSerializer)
|
||||
@ -120,14 +120,14 @@ object JsonSupport {
|
||||
|
||||
}
|
||||
|
||||
object PartySerializer : JsonSerializer<Party.Full>() {
|
||||
override fun serialize(obj: Party.Full, generator: JsonGenerator, provider: SerializerProvider) {
|
||||
object PartySerializer : JsonSerializer<Party>() {
|
||||
override fun serialize(obj: Party, generator: JsonGenerator, provider: SerializerProvider) {
|
||||
generator.writeString(obj.name)
|
||||
}
|
||||
}
|
||||
|
||||
object PartyDeserializer : JsonDeserializer<Party.Full>() {
|
||||
override fun deserialize(parser: JsonParser, context: DeserializationContext): Party.Full {
|
||||
object PartyDeserializer : JsonDeserializer<Party>() {
|
||||
override fun deserialize(parser: JsonParser, context: DeserializationContext): Party {
|
||||
if (parser.currentToken == JsonToken.FIELD_NAME) {
|
||||
parser.nextToken()
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ object ServiceIdentityGenerator {
|
||||
|
||||
val keyPairs = (1..dirs.size).map { generateKeyPair() }
|
||||
val notaryKey = CompositeKey.Builder().addKeys(keyPairs.map { it.public.composite }).build(threshold)
|
||||
val notaryParty = Party.Full(serviceName, notaryKey).serialize()
|
||||
val notaryParty = Party(serviceName, notaryKey).serialize()
|
||||
|
||||
keyPairs.zip(dirs) { keyPair, dir ->
|
||||
Files.createDirectories(dir)
|
||||
|
@ -490,8 +490,8 @@ class TwoPartyTradeFlowTests {
|
||||
private fun LedgerDSL<TestTransactionDSLInterpreter, TestLedgerDSLInterpreter>.fillUpForBuyer(
|
||||
withError: Boolean,
|
||||
owner: CompositeKey,
|
||||
issuer: Party.Full,
|
||||
notary: Party.Full): Pair<Vault, List<WireTransaction>> {
|
||||
issuer: Party,
|
||||
notary: Party): Pair<Vault, List<WireTransaction>> {
|
||||
val interimOwnerKey = MEGA_CORP_PUBKEY
|
||||
// Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she
|
||||
// wants to sell to Bob.
|
||||
@ -538,7 +538,7 @@ class TwoPartyTradeFlowTests {
|
||||
owner: CompositeKey,
|
||||
amount: Amount<Issued<Currency>>,
|
||||
attachmentID: SecureHash?,
|
||||
notary: Party.Full): Pair<Vault, List<WireTransaction>> {
|
||||
notary: Party): Pair<Vault, List<WireTransaction>> {
|
||||
val ap = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
|
||||
output("alice's paper", notary = notary) {
|
||||
CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), owner, amount, TEST_TX_TIME + 7.days)
|
||||
|
@ -65,7 +65,7 @@ open class MockServiceHubInternal(
|
||||
private val txStorageService: TxWritableStorageService
|
||||
get() = storage ?: throw UnsupportedOperationException()
|
||||
|
||||
private val flowFactories = ConcurrentHashMap<Class<*>, (Party.Full) -> FlowLogic<*>>()
|
||||
private val flowFactories = ConcurrentHashMap<Class<*>, (Party) -> FlowLogic<*>>()
|
||||
|
||||
lateinit var smm: StateMachineManager
|
||||
|
||||
@ -83,11 +83,11 @@ open class MockServiceHubInternal(
|
||||
return smm.executor.fetchFrom { smm.add(logic) }
|
||||
}
|
||||
|
||||
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party.Full) -> FlowLogic<*>) {
|
||||
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
|
||||
flowFactories[markerClass.java] = flowFactory
|
||||
}
|
||||
|
||||
override fun getFlowFactory(markerClass: Class<*>): ((Party.Full) -> FlowLogic<*>)? {
|
||||
override fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? {
|
||||
return flowFactories[markerClass]
|
||||
}
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ class NotaryChangeTests {
|
||||
@Test
|
||||
fun `should throw when a participant refuses to change Notary`() {
|
||||
val state = issueMultiPartyState(clientNodeA, clientNodeB, oldNotaryNode)
|
||||
val newEvilNotary = Party.Full("Evil Notary", generateKeyPair().public)
|
||||
val newEvilNotary = Party("Evil Notary", generateKeyPair().public)
|
||||
val flow = Instigator(state, newEvilNotary)
|
||||
val future = clientNodeA.services.startFlow(flow)
|
||||
|
||||
@ -177,7 +177,7 @@ fun issueMultiPartyState(nodeA: AbstractNode, nodeB: AbstractNode, notaryNode: A
|
||||
return stateAndRef
|
||||
}
|
||||
|
||||
fun issueInvalidState(node: AbstractNode, notary: Party.Full): StateAndRef<*> {
|
||||
fun issueInvalidState(node: AbstractNode, notary: Party): StateAndRef<*> {
|
||||
val tx = DummyContract.generateInitial(node.info.legalIdentity.ref(0), Random().nextInt(), notary)
|
||||
tx.setTime(Instant.now(), 30.seconds)
|
||||
val nodeKey = node.services.legalIdentityKey
|
||||
|
@ -31,8 +31,8 @@ class ScheduledFlowTests {
|
||||
lateinit var nodeB: MockNetwork.MockNode
|
||||
|
||||
data class ScheduledState(val creationTime: Instant,
|
||||
val source: Party.Full,
|
||||
val destination: Party.Full,
|
||||
val source: Party,
|
||||
val destination: Party,
|
||||
val processed: Boolean = false,
|
||||
override val linearId: UniqueIdentifier = UniqueIdentifier(),
|
||||
override val contract: Contract = DummyContract()) : SchedulableState, LinearState {
|
||||
@ -52,7 +52,7 @@ class ScheduledFlowTests {
|
||||
}
|
||||
}
|
||||
|
||||
class InsertInitialStateFlow(val destination: Party.Full) : FlowLogic<Unit>() {
|
||||
class InsertInitialStateFlow(val destination: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val scheduledState = ScheduledState(serviceHub.clock.instant(),
|
||||
@ -87,7 +87,7 @@ class ScheduledFlowTests {
|
||||
|
||||
class ScheduledFlowTestPlugin : CordaPluginRegistry() {
|
||||
override val requiredFlows: Map<String, Set<String>> = mapOf(
|
||||
InsertInitialStateFlow::class.java.name to setOf(Party.Full::class.java.name),
|
||||
InsertInitialStateFlow::class.java.name to setOf(Party::class.java.name),
|
||||
ScheduledFlow::class.java.name to setOf(StateRef::class.java.name)
|
||||
)
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ class DataVendingServiceTests {
|
||||
}
|
||||
|
||||
|
||||
private class NotifyTxFlow(val otherParty: Party.Full, val stx: SignedTransaction) : FlowLogic<Unit>() {
|
||||
private class NotifyTxFlow(val otherParty: Party, val stx: SignedTransaction) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, NotifyTxRequest(stx))
|
||||
}
|
||||
|
@ -409,7 +409,7 @@ class StateMachineManagerTests {
|
||||
.withMessage("Chain")
|
||||
}
|
||||
|
||||
private class SendAndReceiveFlow(val otherParty: Party.Full, val payload: Any) : FlowLogic<Unit>() {
|
||||
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
sendAndReceive<Any>(otherParty, payload)
|
||||
@ -449,7 +449,7 @@ class StateMachineManagerTests {
|
||||
)
|
||||
}
|
||||
|
||||
private class ConditionalExceptionFlow(val otherParty: Party.Full, val sendPayload: Any) : FlowLogic<Unit>() {
|
||||
private class ConditionalExceptionFlow(val otherParty: Party, val sendPayload: Any) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val throwException = receive<Boolean>(otherParty).unwrap { it }
|
||||
@ -462,12 +462,12 @@ class StateMachineManagerTests {
|
||||
|
||||
@Test
|
||||
fun `retry subFlow due to receiving FlowException`() {
|
||||
class AskForExceptionFlow(val otherParty: Party.Full, val throwException: Boolean) : FlowLogic<String>() {
|
||||
class AskForExceptionFlow(val otherParty: Party, val throwException: Boolean) : FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String = sendAndReceive<String>(otherParty, throwException).unwrap { it }
|
||||
}
|
||||
|
||||
class RetryOnExceptionFlow(val otherParty: Party.Full) : FlowLogic<String>() {
|
||||
class RetryOnExceptionFlow(val otherParty: Party) : FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
return try {
|
||||
@ -556,7 +556,7 @@ class StateMachineManagerTests {
|
||||
}
|
||||
|
||||
|
||||
private class SendFlow(val payload: Any, vararg val otherParties: Party.Full) : FlowLogic<Unit>() {
|
||||
private class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
init {
|
||||
require(otherParties.isNotEmpty())
|
||||
}
|
||||
@ -566,7 +566,7 @@ class StateMachineManagerTests {
|
||||
}
|
||||
|
||||
|
||||
private class ReceiveFlow(vararg val otherParties: Party.Full) : FlowLogic<Unit>() {
|
||||
private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
private var nonTerminating: Boolean = false
|
||||
|
||||
init {
|
||||
@ -589,7 +589,7 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
private class PingPongFlow(val otherParty: Party.Full, val payload: Long) : FlowLogic<Unit>() {
|
||||
private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic<Unit>() {
|
||||
@Transient var receivedPayload: Long? = null
|
||||
@Transient var receivedPayload2: Long? = null
|
||||
|
||||
|
Reference in New Issue
Block a user