Merge remote-tracking branch 'origin/release/os/4.6' into christians/ENT-5273-update-from-os-4.6-20200803

This commit is contained in:
Christian Sailer 2020-08-03 16:28:20 +01:00
commit 89759510ae
33 changed files with 1737 additions and 288 deletions

View File

@ -13,6 +13,7 @@ see changes to this list.
* agoldvarg
* Ajitha Thayaharan (BCS Technology International)
* Alberto Arri (R3)
* Alex Karnezis
* amiracam
* Amol Pednekar
* Andras Slemmer (R3)

View File

@ -96,5 +96,5 @@ class FinalityFlowTests : WithFinality {
}
/** "Old" CorDapp which will force its node to keep its FinalityHandler enabled */
private fun tokenOldCordapp() = cordappWithPackages("com.template").copy(targetPlatformVersion = 3)
private fun tokenOldCordapp() = cordappWithPackages().copy(targetPlatformVersion = 3)
}

View File

@ -9,23 +9,11 @@
<ID>ClassNaming:BuyerFlow.kt$BuyerFlow$STARTING_BUY : Step</ID>
<ID>ClassNaming:CompositeMemberCompositeSchemaToClassCarpenterTests.kt$I_</ID>
<ID>ClassNaming:CordaServiceTest.kt$CordaServiceTest.DummyServiceFlow.Companion$TEST_STEP : Step</ID>
<ID>ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$AWAITING_REQUEST : Step</ID>
<ID>ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$SENDING_TOP_UP_ISSUE_REQUEST : Step</ID>
<ID>ClassNaming:DeserializeNeedingCarpentryTests.kt$DeserializeNeedingCarpentryTests$outer</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$FINALISING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$GENERATING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$SIGNING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$VERIFYING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$EXTRACTING_VAULT_STATES : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$ID_OTHER_NODES : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$OTHER_TX_COMPONENTS : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SENDING_AND_RECEIVING_DATA : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SIGS_GATHERING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_BUILDING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_SIGNING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_VERIFICATION : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$VERIFYING_SIGS : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$ResponderFlow.Companion$RECEIVING_AND_SENDING_DATA : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$ExceptionFlow$START_STEP : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$RECEIVED_STEP : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$START_STEP : Step</ID>
@ -178,7 +166,6 @@
<ID>ComplexMethod:RPCServer.kt$RPCServer$private fun clientArtemisMessageHandler(artemisMessage: ClientMessage)</ID>
<ID>ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection?</ID>
<ID>ComplexMethod:RemoteTypeCarpenter.kt$SchemaBuildingRemoteTypeCarpenter$override fun carpent(typeInformation: RemoteTypeInformation): Type</ID>
<ID>ComplexMethod:RpcReconnectTests.kt$RpcReconnectTests$ @Test(timeout=300_000) fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`()</ID>
<ID>ComplexMethod:SchemaMigration.kt$SchemaMigration$ private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean</ID>
<ID>ComplexMethod:SchemaMigration.kt$SchemaMigration$private fun doRunMigration( run: Boolean, check: Boolean, existingCheckpoints: Boolean? = null )</ID>
<ID>ComplexMethod:SendTransactionFlow.kt$DataVendingFlow$@Suspendable override fun call(): Void?</ID>
@ -310,7 +297,6 @@
<ID>ForbiddenComment:CordappProviderImplTests.kt$CordappProviderImplTests.Companion$// TODO: Cordapp name should differ from the JAR name</ID>
<ID>ForbiddenComment:CoreFlowHandlers.kt$NotaryChangeHandler$// TODO: Right now all nodes will automatically approve the notary change. We need to figure out if stricter controls are necessary.</ID>
<ID>ForbiddenComment:CrossCashTest.kt$CrossCashState$// TODO: Alternative: We may possibly reduce the complexity of the search even further using some form of</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: Check if non-ECC keys satisfy params (i.e. approved/valid RSA modulus size).</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: We currently use SHA256(seed) when retrying, but BIP32 just skips a counter (i) that results to an invalid key.</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: change the val name to a more descriptive one as it's now confusing and looks like a Key type.</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: change val name to SPHINCS256_SHA512. This will break backwards compatibility.</ID>
@ -435,7 +421,6 @@
<ID>ForbiddenComment:RatesFixFlow.kt$RatesFixFlow.FixQueryFlow$// TODO: add deadline to receive</ID>
<ID>ForbiddenComment:ResolveTransactionsFlow.kt$ResolveTransactionsFlow$// TODO: This could be done in parallel with other fetches for extra speed.</ID>
<ID>ForbiddenComment:ResolveTransactionsFlowTest.kt$ResolveTransactionsFlowTest$// TODO: this operation should not require an explicit transaction</ID>
<ID>ForbiddenComment:RestrictedEntityManager.kt$RestrictedEntityManager$// TODO: Figure out which other methods on EntityManager need to be blocked?</ID>
<ID>ForbiddenComment:ScheduledActivityObserver.kt$ScheduledActivityObserver.Companion$// TODO: Beware we are calling dynamically loaded contract code inside here.</ID>
<ID>ForbiddenComment:ScheduledFlowIntegrationTests.kt$ScheduledFlowIntegrationTests$// TODO: the queries below are not atomic so we need to allow enough time for the scheduler to finish. Would be better to query scheduler.</ID>
<ID>ForbiddenComment:SendTransactionFlow.kt$DataVendingFlow$// Security TODO: Check for abnormally large or malformed data requests</ID>
@ -622,7 +607,6 @@
<ID>FunctionNaming:VersionExtractorTest.kt$VersionExtractorTest$@Test(timeout=300_000) fun version_header_extraction_present()</ID>
<ID>LargeClass:AbstractNode.kt$AbstractNode&lt;S&gt; : SingletonSerializeAsToken</ID>
<ID>LargeClass:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager : StateMachineManagerStateMachineManagerInternal</ID>
<ID>LongMethod:FlowCookbook.kt$InitiatorFlow$@Suppress("RemoveExplicitTypeArguments") @Suspendable override fun call()</ID>
<ID>LongMethod:HibernateQueryCriteriaParser.kt$HibernateQueryCriteriaParser$override fun parseCriteria(criteria: CommonQueryCriteria): Collection&lt;Predicate&gt;</ID>
<ID>LongParameterList:AMQPSerializer.kt$AMQPSerializer$(obj: Any, data: Data, type: Type, output: SerializationOutput, context: SerializationContext, debugIndent: Int = 0)</ID>
<ID>LongParameterList:AbstractCashSelection.kt$AbstractCashSelection$(connection: Connection, amount: Amount&lt;Currency&gt;, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set&lt;AbstractParty&gt;, withIssuerRefs: Set&lt;OpaqueBytes&gt;, withResultSet: (ResultSet) -&gt; Boolean)</ID>
@ -634,7 +618,6 @@
<ID>LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)</ID>
<ID>LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, sslOptions: BrokerRpcSslOptions, securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)</ID>
<ID>LongParameterList:ArtemisRpcTests.kt$ArtemisRpcTests$(nodeSSlconfig: MutualSslConfiguration, brokerSslOptions: BrokerRpcSslOptions?, useSslForBroker: Boolean, clientSslOptions: ClientRpcSslOptions?, address: NetworkHostAndPort = ports.nextHostAndPort(), adminAddress: NetworkHostAndPort = ports.nextHostAndPort(), baseDirectory: Path = tempFolder.root.toPath() )</ID>
<ID>LongParameterList:AttachmentsClassLoader.kt$AttachmentsClassLoaderBuilder$(attachments: List&lt;Attachment&gt;, params: NetworkParameters, txId: SecureHash, isAttachmentTrusted: (Attachment) -&gt; Boolean, parent: ClassLoader = ClassLoader.getSystemClassLoader(), block: (ClassLoader) -&gt; T)</ID>
<ID>LongParameterList:BFTSmart.kt$BFTSmart.Replica$( states: List&lt;StateRef&gt;, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List&lt;StateRef&gt; = emptyList() )</ID>
<ID>LongParameterList:BusinessCalendar.kt$BusinessCalendar.Companion$(startDate: LocalDate, period: Frequency, calendar: BusinessCalendar = EMPTY, dateRollConvention: DateRollConvention = DateRollConvention.Following, noOfAdditionalPeriods: Int = Integer.MAX_VALUE, endDate: LocalDate? = null, periodOffset: Int? = null)</ID>
<ID>LongParameterList:Cash.kt$Cash$(inputs: List&lt;State&gt;, outputs: List&lt;State&gt;, tx: LedgerTransaction, issueCommand: CommandWithParties&lt;Commands.Issue&gt;, currency: Currency, issuer: PartyAndReference)</ID>
@ -732,7 +715,6 @@
<ID>LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuer: X500Principal, issuerPublicKey: PublicKey, issuerSigner: ContentSigner, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair&lt;Date, Date&gt;, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)</ID>
<ID>LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuer: X500Principal, issuerPublicKey: PublicKey, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair&lt;Date, Date&gt;, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)</ID>
<ID>LongParameterList:X509Utilities.kt$X509Utilities$(certificateType: CertificateType, issuerCertificate: X509Certificate, issuerKeyPair: KeyPair, subject: X500Principal, subjectPublicKey: PublicKey, validityWindow: Pair&lt;Duration, Duration&gt; = DEFAULT_VALIDITY_WINDOW, nameConstraints: NameConstraints? = null, crlDistPoint: String? = null, crlIssuer: X500Name? = null)</ID>
<ID>LongParameterList:internalAccessTestHelpers.kt$( inputs: List&lt;StateAndRef&lt;ContractState&gt;&gt;, outputs: List&lt;TransactionState&lt;ContractState&gt;&gt;, commands: List&lt;CommandWithParties&lt;CommandData&gt;&gt;, attachments: List&lt;Attachment&gt;, id: SecureHash, notary: Party?, timeWindow: TimeWindow?, privacySalt: PrivacySalt, networkParameters: NetworkParameters, references: List&lt;StateAndRef&lt;ContractState&gt;&gt;, componentGroups: List&lt;ComponentGroup&gt;? = null, serializedInputs: List&lt;SerializedStateAndRef&gt;? = null, serializedReferences: List&lt;SerializedStateAndRef&gt;? = null, isAttachmentTrusted: (Attachment) -&gt; Boolean )</ID>
<ID>MagicNumber:AMQPClientSerializationScheme.kt$AMQPClientSerializationScheme.Companion$128</ID>
<ID>MagicNumber:AMQPSerializationScheme.kt$AbstractAMQPSerializationScheme$128</ID>
<ID>MagicNumber:AMQPServer.kt$AMQPServer$100</ID>
@ -753,7 +735,6 @@
<ID>MagicNumber:AttachmentDemo.kt$10009</ID>
<ID>MagicNumber:AttachmentDemo.kt$10010</ID>
<ID>MagicNumber:AttachmentTrustTable.kt$AttachmentTrustTable$3</ID>
<ID>MagicNumber:AttachmentsClassLoader.kt$AttachmentsClassLoader$4</ID>
<ID>MagicNumber:AzureSmbVolume.kt$AzureSmbVolume$5000</ID>
<ID>MagicNumber:BFTSmart.kt$BFTSmart.Client$100</ID>
<ID>MagicNumber:BFTSmart.kt$BFTSmart.Replica.&lt;no name provided&gt;$20000</ID>
@ -775,12 +756,6 @@
<ID>MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$16</ID>
<ID>MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$30.0</ID>
<ID>MagicNumber:ClassCarpenter.kt$ClassCarpenterImpl$3</ID>
<ID>MagicNumber:ClientRpcExample.kt$ClientRpcExample$3</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$0.7</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$0.8</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$1000</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$10000</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$2000</ID>
<ID>MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$10</ID>
<ID>MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$30</ID>
<ID>MagicNumber:CompositeSignature.kt$CompositeSignature$1024</ID>
@ -846,11 +821,6 @@
<ID>MagicNumber:ExchangeRateModel.kt$1.18</ID>
<ID>MagicNumber:ExchangeRateModel.kt$1.31</ID>
<ID>MagicNumber:FixingFlow.kt$FixingFlow.Fixer.&lt;no name provided&gt;$30</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$30</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$45</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$777</ID>
<ID>MagicNumber:FlowCookbook.kt$ResponderFlow$99</ID>
<ID>MagicNumber:FlowCookbook.kt$ResponderFlow.&lt;no name provided&gt;$777</ID>
<ID>MagicNumber:FlowLogic.kt$FlowLogic$300</ID>
<ID>MagicNumber:FlowLogic.kt$FlowLogic.Companion$5</ID>
<ID>MagicNumber:FlowMonitor.kt$FlowMonitor$1000</ID>
@ -864,7 +834,6 @@
<ID>MagicNumber:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$10</ID>
<ID>MagicNumber:HttpUtils.kt$HttpUtils$5</ID>
<ID>MagicNumber:HttpUtils.kt$HttpUtils$60</ID>
<ID>MagicNumber:IOUFlowResponder.kt$IOUFlowResponder.&lt;no name provided&gt;$100</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$360.0</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$4</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$8</ID>
@ -1026,7 +995,6 @@
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$100</ID>
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$32768</ID>
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$40</ID>
<ID>MagicNumber:NonValidatingNotaryFlow.kt$NonValidatingNotaryFlow$4</ID>
<ID>MagicNumber:Notarise.kt$10</ID>
<ID>MagicNumber:Notarise.kt$10003</ID>
<ID>MagicNumber:NullKeys.kt$NullKeys$32</ID>
@ -1143,7 +1111,6 @@
<ID>MagicNumber:StandaloneShell.kt$StandaloneShell$7</ID>
<ID>MagicNumber:StateRevisionFlow.kt$StateRevisionFlow.Requester$30</ID>
<ID>MagicNumber:Structures.kt$PrivacySalt$32</ID>
<ID>MagicNumber:TargetVersionDependentRules.kt$StateContractValidationEnforcementRule$4</ID>
<ID>MagicNumber:TestNodeInfoBuilder.kt$TestNodeInfoBuilder$1234</ID>
<ID>MagicNumber:TestUtils.kt$10000</ID>
<ID>MagicNumber:TestUtils.kt$30000</ID>
@ -1154,7 +1121,6 @@
<ID>MagicNumber:TraderDemoClientApi.kt$TraderDemoClientApi$3</ID>
<ID>MagicNumber:TransactionBuilder.kt$TransactionBuilder$4</ID>
<ID>MagicNumber:TransactionDSLInterpreter.kt$TransactionDSL$30</ID>
<ID>MagicNumber:TransactionUtils.kt$4</ID>
<ID>MagicNumber:TransactionVerificationException.kt$TransactionVerificationException.ConstraintPropagationRejection$3</ID>
<ID>MagicNumber:TransactionViewer.kt$TransactionViewer$15.0</ID>
<ID>MagicNumber:TransactionViewer.kt$TransactionViewer$20.0</ID>
@ -1179,8 +1145,6 @@
<ID>MagicNumber:WebServer.kt$100.0</ID>
<ID>MagicNumber:WebServer.kt$WebServer$500</ID>
<ID>MagicNumber:WireTransaction.kt$WireTransaction$4</ID>
<ID>MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitCompletionFlow$60</ID>
<ID>MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitTradeApprovalFlow$60</ID>
<ID>MagicNumber:X509Utilities.kt$X509Utilities$3650</ID>
<ID>MagicNumber:errorAndTerminate.kt$10</ID>
<ID>MatchingDeclarationName:AMQPSerializerFactories.kt$net.corda.serialization.internal.amqp.AMQPSerializerFactories.kt</ID>
@ -1225,7 +1189,6 @@
<ID>MatchingDeclarationName:TestConstants.kt$net.corda.testing.core.TestConstants.kt</ID>
<ID>MatchingDeclarationName:TestUtils.kt$net.corda.testing.core.TestUtils.kt</ID>
<ID>MatchingDeclarationName:TransactionTypes.kt$net.corda.explorer.model.TransactionTypes.kt</ID>
<ID>MatchingDeclarationName:TutorialFlowStateMachines.kt$net.corda.docs.kotlin.tutorial.flowstatemachines.TutorialFlowStateMachines.kt</ID>
<ID>MatchingDeclarationName:Utils.kt$io.cryptoblk.core.Utils.kt</ID>
<ID>MatchingDeclarationName:VirtualCordapps.kt$net.corda.node.internal.cordapp.VirtualCordapps.kt</ID>
<ID>ModifierOrder:NodeNamedCache.kt$DefaultNamedCacheFactory$open protected</ID>
@ -1317,13 +1280,8 @@
<ID>SpreadOperator:FlowFrameworkTripartyTests.kt$FlowFrameworkTripartyTests$(*expected)</ID>
<ID>SpreadOperator:FlowLogicRefFactoryImpl.kt$FlowLogicRefFactoryImpl$(flowClass, *args)</ID>
<ID>SpreadOperator:FlowOverrideTests.kt$FlowOverrideTests$(*nodeAClasses.toTypedArray())</ID>
<ID>SpreadOperator:FlowOverrideTests.kt$FlowOverrideTests$(*nodeBClasses.toTypedArray())</ID>
<ID>SpreadOperator:FlowTestsUtils.kt$(*allSessions)</ID>
<ID>SpreadOperator:FlowTestsUtils.kt$(session, *sessions)</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourInputStates.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirInputStates.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray())</ID>
<ID>SpreadOperator:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$(OpaqueBytes(request.encoded), "Platform-Version" to "${versionInfo.platformVersion}", "Client-Version" to versionInfo.releaseVersion, "Private-Network-Map" to (config.pnm?.toString() ?: ""), *(config.csrToken?.let { arrayOf(CENM_SUBMISSION_TOKEN to it) } ?: arrayOf()))</ID>
<ID>SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray())</ID>
<ID>SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())</ID>
@ -1453,7 +1411,6 @@
<ID>ThrowsCount:StructuresTests.kt$AttachmentTest$@Test(timeout=300_000) fun `openAsJAR does not leak file handle if attachment has corrupted manifest`()</ID>
<ID>ThrowsCount:TransactionVerifierServiceInternal.kt$Verifier$ private fun getUniqueContractAttachmentsByContract(): Map&lt;ContractClassName, ContractAttachment&gt;</ID>
<ID>ThrowsCount:TransactionVerifierServiceInternal.kt$Verifier$// Using basic graph theory, a full cycle of encumbered (co-dependent) states should exist to achieve bi-directional // encumbrances. This property is important to ensure that no states involved in an encumbrance-relationship // can be spent on their own. Briefly, if any of the states is having more than one encumbrance references by // other states, a full cycle detection will fail. As a result, all of the encumbered states must be present // as "from" and "to" only once (or zero times if no encumbrance takes place). For instance, // a -&gt; b // c -&gt; b and a -&gt; b // b -&gt; a b -&gt; c // do not satisfy the bi-directionality (full cycle) property. // // In the first example "b" appears twice in encumbrance ("to") list and "c" exists in the encumbered ("from") list only. // Due the above, one could consume "a" and "b" in the same transaction and then, because "b" is already consumed, "c" cannot be spent. // // Similarly, the second example does not form a full cycle because "a" and "c" exist in one of the lists only. // As a result, one can consume "b" and "c" in the same transactions, which will make "a" impossible to be spent. // // On other hand the following are valid constructions: // a -&gt; b a -&gt; c // b -&gt; c and c -&gt; b // c -&gt; a b -&gt; a // and form a full cycle, meaning that the bi-directionality property is satisfied. private fun checkBidirectionalOutputEncumbrances(statesAndEncumbrance: List&lt;Pair&lt;Int, Int&gt;&gt;)</ID>
<ID>ThrowsCount:WireTransaction.kt$WireTransaction$private fun toLedgerTransactionInternal( resolveIdentity: (PublicKey) -&gt; Party?, resolveAttachment: (SecureHash) -&gt; Attachment?, resolveStateRefAsSerialized: (StateRef) -&gt; SerializedBytes&lt;TransactionState&lt;ContractState&gt;&gt;?, resolveParameters: (SecureHash?) -&gt; NetworkParameters?, isAttachmentTrusted: (Attachment) -&gt; Boolean ): LedgerTransaction</ID>
<ID>ThrowsCount:WireTransaction.kt$WireTransaction.Companion$ @CordaInternal fun resolveStateRefBinaryComponent(stateRef: StateRef, services: ServicesForResolution): SerializedBytes&lt;TransactionState&lt;ContractState&gt;&gt;?</ID>
<ID>TooGenericExceptionCaught:AMQPChannelHandler.kt$AMQPChannelHandler$ex: Exception</ID>
<ID>TooGenericExceptionCaught:AMQPExceptions.kt$th: Throwable</ID>
@ -1502,7 +1459,6 @@
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$DriverDSLImpl.Companion$th: Throwable</ID>
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$exception: Throwable</ID>
<ID>TooGenericExceptionCaught:DriverTests.kt$DriverTests$e: Exception</ID>
<ID>TooGenericExceptionCaught:ErrorCodeLoggingTests.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:ErrorHandling.kt$ErrorHandling.CheckpointAfterErrorFlow$t: Throwable</ID>
<ID>TooGenericExceptionCaught:EventProcessor.kt$EventProcessor$ex: Exception</ID>
<ID>TooGenericExceptionCaught:Eventually.kt$e: Exception</ID>
@ -1574,7 +1530,6 @@
<ID>TooGenericExceptionCaught:NotaryUtils.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:ObjectDiffer.kt$ObjectDiffer$throwable: Exception</ID>
<ID>TooGenericExceptionCaught:P2PMessagingClient.kt$P2PMessagingClient$e: Exception</ID>
<ID>TooGenericExceptionCaught:PersistentIdentityMigrationNewTableTest.kt$PersistentIdentityMigrationNewTableTest$e: Exception</ID>
<ID>TooGenericExceptionCaught:PersistentUniquenessProvider.kt$PersistentUniquenessProvider$e: Exception</ID>
<ID>TooGenericExceptionCaught:ProfileController.kt$ProfileController$e: Exception</ID>
<ID>TooGenericExceptionCaught:PropertyValidationTest.kt$PropertyValidationTest$e: Exception</ID>
@ -1715,6 +1670,7 @@
<ID>TooManyFunctions:RPCApi.kt$net.corda.nodeapi.RPCApi.kt</ID>
<ID>TooManyFunctions:RPCClientProxyHandler.kt$RPCClientProxyHandler : InvocationHandler</ID>
<ID>TooManyFunctions:RPCServer.kt$RPCServer</ID>
<ID>TooManyFunctions:SSLHelper.kt$net.corda.nodeapi.internal.protonwrapper.netty.SSLHelper.kt</ID>
<ID>TooManyFunctions:SerializationHelper.kt$net.corda.serialization.internal.amqp.SerializationHelper.kt</ID>
<ID>TooManyFunctions:ServiceHub.kt$ServiceHub : ServicesForResolution</ID>
<ID>TooManyFunctions:SignedTransaction.kt$SignedTransaction : TransactionWithSignatures</ID>
@ -1742,8 +1698,6 @@
<ID>UnusedImports:Amount.kt$import net.corda.core.crypto.CompositeKey</ID>
<ID>UnusedImports:Amount.kt$import net.corda.core.identity.Party</ID>
<ID>UnusedImports:DummyLinearStateSchemaV1.kt$import net.corda.core.contracts.ContractState</ID>
<ID>UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.core.internal.packageName</ID>
<ID>UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.finance.schemas.CashSchemaV1</ID>
<ID>UnusedImports:InternalTestUtils.kt$import java.nio.file.Files</ID>
<ID>UnusedImports:InternalTestUtils.kt$import net.corda.nodeapi.internal.loadDevCaTrustStore</ID>
<ID>UnusedImports:NetworkMap.kt$import net.corda.core.node.NodeInfo</ID>
@ -2012,8 +1966,6 @@
<ID>WildcardImport:CordaModule.kt$import net.corda.core.identity.*</ID>
<ID>WildcardImport:CordaModule.kt$import net.corda.core.transactions.*</ID>
<ID>WildcardImport:CordaRPCOps.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import org.assertj.core.api.Assertions.*</ID>
<ID>WildcardImport:CordaServiceTest.kt$import kotlin.test.*</ID>
<ID>WildcardImport:CordaViewModel.kt$import tornadofx.*</ID>
<ID>WildcardImport:Cordapp.kt$import net.corda.core.cordapp.Cordapp.Info.*</ID>
@ -2031,10 +1983,6 @@
<ID>WildcardImport:CryptoSignUtils.kt$import net.corda.core.crypto.*</ID>
<ID>WildcardImport:CryptoUtilsTest.kt$import kotlin.test.*</ID>
<ID>WildcardImport:CustomCordapp.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:CustomVaultQuery.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:CustomVaultQuery.kt$import net.corda.core.utilities.*</ID>
<ID>WildcardImport:CustomVaultQueryTest.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CustomVaultQueryTest.kt$import net.corda.finance.*</ID>
<ID>WildcardImport:DBNetworkParametersStorage.kt$import javax.persistence.*</ID>
<ID>WildcardImport:DBRunnerExtension.kt$import org.junit.jupiter.api.extension.*</ID>
<ID>WildcardImport:DBTransactionStorage.kt$import javax.persistence.*</ID>
@ -2057,7 +2005,6 @@
<ID>WildcardImport:DeserializeSimpleTypesTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:DigitalSignatureWithCert.kt$import java.security.cert.*</ID>
<ID>WildcardImport:DistributedServiceTests.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:DoRemainingWorkTransition.kt$import net.corda.node.services.statemachine.*</ID>
<ID>WildcardImport:DockerInstantiator.kt$import com.github.dockerjava.api.model.*</ID>
<ID>WildcardImport:DriverDSLImpl.kt$import net.corda.testing.driver.*</ID>
<ID>WildcardImport:DummyContract.kt$import net.corda.core.contracts.*</ID>
@ -2076,8 +2023,6 @@
<ID>WildcardImport:EvolutionSerializerFactoryTests.kt$import kotlin.test.*</ID>
<ID>WildcardImport:EvolutionSerializerFactoryTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:Explorer.kt$import tornadofx.*</ID>
<ID>WildcardImport:FiberDeserializationCheckingInterceptor.kt$import net.corda.node.services.statemachine.*</ID>
<ID>WildcardImport:FinalityFlowMigration.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FinalityFlowTests.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:FinalityFlowTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FinalityHandlerTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.*</ID>
@ -2089,11 +2034,7 @@
<ID>WildcardImport:FlowCheckpointCordapp.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:FlowCookbook.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:FlowCookbook.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowFrameworkPersistenceTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowFrameworkTests.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowFrameworkTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowFrameworkTripartyTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowLogicRefFactoryImpl.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowMatchers.kt$import net.corda.coretesting.internal.matchers.*</ID>
@ -2101,10 +2042,7 @@
<ID>WildcardImport:FlowRetryTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStackSnapshotTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachine.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:FlowsDrainingModeContentionTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FxTransactionBuildTutorialTest.kt$import net.corda.finance.*</ID>
<ID>WildcardImport:GenericsTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:Gui.kt$import tornadofx.*</ID>
<ID>WildcardImport:GuiUtilities.kt$import tornadofx.*</ID>
@ -2121,8 +2059,6 @@
<ID>WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.EqualityComparisonOperator.*</ID>
<ID>WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.LikenessOperator.*</ID>
<ID>WildcardImport:HibernateStatistics.kt$import org.hibernate.stat.*</ID>
<ID>WildcardImport:IOUContract.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:IOUFlowResponder.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:IRS.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:IRS.kt$import net.corda.finance.contracts.*</ID>
<ID>WildcardImport:IRSState.kt$import net.corda.core.contracts.*</ID>
@ -2169,7 +2105,6 @@
<ID>WildcardImport:JarSignatureCollectorTest.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:KeyStoreUtilities.kt$import java.security.*</ID>
<ID>WildcardImport:KeyStoreUtilities.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:KotlinIntegrationTestingTutorial.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:Kryo.kt$import com.esotericsoftware.kryo.*</ID>
<ID>WildcardImport:Kryo.kt$import net.corda.core.transactions.*</ID>
<ID>WildcardImport:KryoStreamsTest.kt$import java.io.*</ID>
@ -2420,8 +2355,6 @@
<ID>WildcardImport:SignedTransaction.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:SignedTransaction.kt$import net.corda.core.crypto.*</ID>
<ID>WildcardImport:SimpleMQClient.kt$import org.apache.activemq.artemis.api.core.client.*</ID>
<ID>WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.node.services.statemachine.interceptors.*</ID>
<ID>WildcardImport:SpringDriver.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.node.services.vault.*</ID>
@ -2473,8 +2406,6 @@
<ID>WildcardImport:TransactionViewer.kt$import net.corda.client.jfx.utils.*</ID>
<ID>WildcardImport:TransactionViewer.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TransactionViewer.kt$import tornadofx.*</ID>
<ID>WildcardImport:TutorialContract.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TutorialTestDSL.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:TwoPartyDealFlow.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.flows.*</ID>
@ -2502,7 +2433,6 @@
<ID>WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:VaultFiller.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:VaultFlowTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.QueryCriteria.*</ID>

View File

@ -200,10 +200,7 @@ internal fun createClientSslHelper(target: NetworkHostAndPort,
expectedRemoteLegalNames: Set<CordaX500Name>,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
sslContext.init(keyManagers, trustManagers, newSecureRandom())
val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory)
val sslEngine = sslContext.createSSLEngine(target.host, target.port)
sslEngine.useClientMode = true
sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray()
@ -239,10 +236,7 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort,
internal fun createServerSslHandler(keyStore: CertificateStore,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
sslContext.init(keyManagers, trustManagers, newSecureRandom())
val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory)
val sslEngine = sslContext.createSSLEngine()
sslEngine.useClientMode = false
sslEngine.needClientAuth = true
@ -256,6 +250,15 @@ internal fun createServerSslHandler(keyStore: CertificateStore,
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
fun createAndInitSslContext(keyManagerFactory: KeyManagerFactory, trustManagerFactory: TrustManagerFactory): SSLContext {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java)
.map { LoggingTrustManagerWrapper(it) }.toTypedArray()
sslContext.init(keyManagers, trustManagers, newSecureRandom())
return sslContext
}
@VisibleForTesting
fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore, revocationConfig: RevocationConfig): ManagerFactoryParameters {
val pkixParams = PKIXBuilderParameters(trustStore.value.internal, X509CertSelector())

View File

@ -0,0 +1,216 @@
package net.corda.node.amqp;
import net.corda.nodeapi.internal.protonwrapper.netty.SSLHelperKt;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManagerFactory;
/**
* An SSL/TLS client that connects to a server using its IP address and port.
* <p/>
* After initialization of a {@link NioSslClient} object, {@link NioSslClient#connect()} should be called,
* in order to establish connection with the server.
* <p/>
* When the connection between the client and the object is established, {@link NioSslClient} provides
* a public write and read method, in order to communicate with its peer.
*
* @author <a href="mailto:alex.a.karnezis@gmail.com">Alex Karnezis</a>
*/
public class NioSslClient extends NioSslPeer {
/**
* The remote address of the server this client is configured to connect to.
*/
private final String remoteAddress;
/**
* The port of the server this client is configured to connect to.
*/
private final int port;
/**
* The engine that will be used to encrypt/decrypt data between this client and the server.
*/
private final SSLEngine engine;
/**
* The socket channel that will be used as the transport link between this client and the server.
*/
private SocketChannel socketChannel;
/**
* Initiates the engine to run as a client using peer information, and allocates space for the
* buffers that will be used by the engine.
*
* @param remoteAddress The IP address of the peer.
* @param port The peer's port that will be used.
*/
public NioSslClient(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, String remoteAddress, int port) {
this.remoteAddress = remoteAddress;
this.port = port;
SSLContext context = SSLHelperKt.createAndInitSslContext(keyManagerFactory, trustManagerFactory);
engine = context.createSSLEngine(remoteAddress, port);
engine.setUseClientMode(true);
SSLSession session = engine.getSession();
myAppData = ByteBuffer.allocate(1024);
myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
peerAppData = ByteBuffer.allocate(1024);
peerNetData = ByteBuffer.allocate(session.getPacketBufferSize());
}
/**
* Opens a socket channel to communicate with the configured server and tries to complete the handshake protocol.
*
* @return True if client established a connection with the server, false otherwise.
*/
public boolean connect() throws Exception {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(remoteAddress, port));
while (!socketChannel.finishConnect()) {
// can do something here...
}
engine.beginHandshake();
return doHandshake(socketChannel, engine);
}
/**
* Public method to send a message to the server.
*
* @param message - message to be sent to the server.
* @throws IOException if an I/O error occurs to the socket channel.
*/
public void write(String message) throws IOException {
write(socketChannel, engine, message);
}
/**
* Implements the write method that sends a message to the server the client is connected to,
* but should not be called by the user, since socket channel and engine are inner class' variables.
* {@link NioSslClient#write(String)} should be called instead.
*
* @param message - message to be sent to the server.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @throws IOException if an I/O error occurs to the socket channel.
*/
@Override
protected void write(SocketChannel socketChannel, SSLEngine engine, String message) throws IOException {
log.debug("About to write to the server...");
myAppData.clear();
myAppData.put(message.getBytes());
myAppData.flip();
while (myAppData.hasRemaining()) {
// The loop has a meaning for (outgoing) messages larger than 16KB.
// Every wrap call will remove 16KB from the original message and send it to the remote peer.
myNetData.clear();
SSLEngineResult result = engine.wrap(myAppData, myNetData);
switch (result.getStatus()) {
case OK:
myNetData.flip();
while (myNetData.hasRemaining()) {
socketChannel.write(myNetData);
}
log.debug("Message sent to the server: " + message);
break;
case BUFFER_OVERFLOW:
myNetData = enlargePacketBuffer(engine, myNetData);
break;
case BUFFER_UNDERFLOW:
throw new SSLException("Buffer underflow occured after a wrap. I don't think we should ever get here.");
case CLOSED:
closeConnection(socketChannel, engine);
return;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
}
}
/**
* Public method to try to read from the server.
*/
public void read() throws Exception {
read(socketChannel, engine);
}
/**
* Will wait for response from the remote peer, until it actually gets something.
* Uses {@link SocketChannel#read(ByteBuffer)}, which is non-blocking, and if
* it gets nothing from the peer, waits for {@code waitToReadMillis} and tries again.
* <p/>
* Just like {@link NioSslPeer#read(SocketChannel, SSLEngine)} it uses inner class' socket channel
* and engine and should not be used by the client. {@link NioSslClient#read()} should be called instead.
*
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
*/
@Override
protected void read(SocketChannel socketChannel, SSLEngine engine) throws Exception {
log.debug("About to read from the server...");
peerNetData.clear();
int waitToReadMillis = 50;
boolean exitReadLoop = false;
while (!exitReadLoop) {
int bytesRead = socketChannel.read(peerNetData);
if (bytesRead > 0) {
peerNetData.flip();
while (peerNetData.hasRemaining()) {
peerAppData.clear();
SSLEngineResult result = engine.unwrap(peerNetData, peerAppData);
switch (result.getStatus()) {
case OK:
peerAppData.flip();
log.debug("Server response: " + peerAppDataAsString());
exitReadLoop = true;
break;
case BUFFER_OVERFLOW:
peerAppData = enlargeApplicationBuffer(engine, peerAppData);
break;
case BUFFER_UNDERFLOW:
peerNetData = handleBufferUnderflow(engine, peerNetData);
break;
case CLOSED:
closeConnection(socketChannel, engine);
return;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
}
} else if (bytesRead < 0) {
handleEndOfStream(socketChannel, engine);
return;
}
Thread.sleep(waitToReadMillis);
}
}
/**
* Should be called when the client wants to explicitly close the connection to the server.
*
* @throws IOException if an I/O error occurs to the socket channel.
*/
public void shutdown() throws IOException {
log.debug("About to close connection with the server...");
closeConnection(socketChannel, engine);
executor.shutdown();
log.debug("Goodbye!");
}
}

View File

@ -0,0 +1,329 @@
package net.corda.node.amqp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* A class that represents an SSL/TLS peer, and can be extended to create a client or a server.
* <p/>
* It makes use of the JSSE framework, and specifically the {@link SSLEngine} logic, which
* is described by Oracle as "an advanced API, not appropriate for casual use", since
* it requires the user to implement much of the communication establishment procedure himself.
* More information about it can be found here: http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SSLEngine
* <p/>
* {@link NioSslPeer} implements the handshake protocol, required to establish a connection between two peers,
* which is common for both client and server and provides the abstract {@link NioSslPeer#read(SocketChannel, SSLEngine)} and
* {@link NioSslPeer#write(SocketChannel, SSLEngine, String)} methods, that need to be implemented by the specific SSL/TLS peer
* that is going to extend this class.
*
* @author <a href="mailto:alex.a.karnezis@gmail.com">Alex Karnezis</a>
*/
public abstract class NioSslPeer {
/**
* Class' logger.
*/
protected final Logger log = LoggerFactory.getLogger(getClass());
/**
* Will contain this peer's application data in plaintext, that will be later encrypted
* using {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} and sent to the other peer. This buffer can typically
* be of any size, as long as it is large enough to contain this peer's outgoing messages.
* If this peer tries to send a message bigger than buffer's capacity a {@link BufferOverflowException}
* will be thrown.
*/
protected ByteBuffer myAppData;
/**
* Will contain this peer's encrypted data, that will be generated after {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)}
* is applied on {@link NioSslPeer#myAppData}. It should be initialized using {@link SSLSession#getPacketBufferSize()},
* which returns the size up to which, SSL/TLS packets will be generated from the engine under a session.
* All SSLEngine network buffers should be sized at least this large to avoid insufficient space problems when performing wrap and unwrap calls.
*/
protected ByteBuffer myNetData;
/**
* Will contain the other peer's (decrypted) application data. It must be large enough to hold the application data
* from any peer. Can be initialized with {@link SSLSession#getApplicationBufferSize()} for an estimation
* of the other peer's application data and should be enlarged if this size is not enough.
*/
protected ByteBuffer peerAppData;
/**
* Will contain the other peer's encrypted data. The SSL/TLS protocols specify that implementations should produce packets containing at most 16 KB of plaintext,
* so a buffer sized to this value should normally cause no capacity problems. However, some implementations violate the specification and generate large records up to 32 KB.
* If the {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} detects large inbound packets, the buffer sizes returned by SSLSession will be updated dynamically, so the this peer
* should check for overflow conditions and enlarge the buffer using the session's (updated) buffer size.
*/
protected ByteBuffer peerNetData;
/**
* Will be used to execute tasks that may emerge during handshake in parallel with the server's main thread.
*/
protected ExecutorService executor = Executors.newSingleThreadExecutor();
protected abstract void read(SocketChannel socketChannel, SSLEngine engine) throws Exception;
protected abstract void write(SocketChannel socketChannel, SSLEngine engine, String message) throws Exception;
/**
* Implements the handshake protocol between two peers, required for the establishment of the SSL/TLS connection.
* During the handshake, encryption configuration information - such as the list of available cipher suites - will be exchanged
* and if the handshake is successful will lead to an established SSL/TLS session.
*
* <p/>
* A typical handshake will usually contain the following steps:
*
* <ul>
* <li>1. wrap: ClientHello</li>
* <li>2. unwrap: ServerHello/Cert/ServerHelloDone</li>
* <li>3. wrap: ClientKeyExchange</li>
* <li>4. wrap: ChangeCipherSpec</li>
* <li>5. wrap: Finished</li>
* <li>6. unwrap: ChangeCipherSpec</li>
* <li>7. unwrap: Finished</li>
* </ul>
* <p/>
* Handshake is also used during the end of the session, in order to properly close the connection between the two peers.
* A proper connection close will typically include the one peer sending a CLOSE message to another, and then wait for
* the other's CLOSE message to close the transport link. The other peer from his perspective would read a CLOSE message
* from his peer and then enter the handshake procedure to send his own CLOSE message as well.
*
* @param socketChannel - the socket channel that connects the two peers.
* @param engine - the engine that will be used for encryption/decryption of the data exchanged with the other peer.
* @return True if the connection handshake was successful or false if an error occurred.
* @throws IOException - if an error occurs during read/write to the socket channel.
*/
protected boolean doHandshake(SocketChannel socketChannel, SSLEngine engine) throws IOException {
log.debug("About to do handshake...");
SSLEngineResult result;
HandshakeStatus handshakeStatus;
// NioSslPeer's fields myAppData and peerAppData are supposed to be large enough to hold all message data the peer
// will send and expects to receive from the other peer respectively. Since the messages to be exchanged will usually be less
// than 16KB long the capacity of these fields should also be smaller. Here we initialize these two local buffers
// to be used for the handshake, while keeping client's buffers at the same size.
int appBufferSize = engine.getSession().getApplicationBufferSize();
ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
myNetData.clear();
peerNetData.clear();
handshakeStatus = engine.getHandshakeStatus();
while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
switch (handshakeStatus) {
case NEED_UNWRAP:
if (socketChannel.read(peerNetData) < 0) {
if (engine.isInboundDone() && engine.isOutboundDone()) {
return false;
}
try {
engine.closeInbound();
} catch (SSLException e) {
log.error("This engine was forced to close inbound, without having received the proper SSL/TLS close " +
"notification message from the peer, due to end of stream.", e);
}
engine.closeOutbound();
// After closeOutbound the engine will be set to WRAP state, in order to try to send a close message to the client.
handshakeStatus = engine.getHandshakeStatus();
break;
}
peerNetData.flip();
try {
result = engine.unwrap(peerNetData, peerAppData);
peerNetData.compact();
handshakeStatus = result.getHandshakeStatus();
} catch (SSLException sslException) {
log.error("A problem was encountered while processing the data that caused the SSLEngine to abort." +
" Will try to properly close connection...", sslException);
engine.closeOutbound();
handshakeStatus = engine.getHandshakeStatus();
break;
}
switch (result.getStatus()) {
case OK:
break;
case BUFFER_OVERFLOW:
// Will occur when peerAppData's capacity is smaller than the data derived from peerNetData's unwrap.
peerAppData = enlargeApplicationBuffer(engine, peerAppData);
break;
case BUFFER_UNDERFLOW:
// Will occur either when no data was read from the peer or when the peerNetData buffer was too small to hold all peer's data.
peerNetData = handleBufferUnderflow(engine, peerNetData);
break;
case CLOSED:
if (engine.isOutboundDone()) {
return false;
} else {
engine.closeOutbound();
handshakeStatus = engine.getHandshakeStatus();
break;
}
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
break;
case NEED_WRAP:
myNetData.clear();
try {
result = engine.wrap(myAppData, myNetData);
handshakeStatus = result.getHandshakeStatus();
} catch (SSLException sslException) {
log.error("A problem was encountered while processing the data that caused the SSLEngine to abort." +
"Will try to properly close connection...", sslException);
engine.closeOutbound();
handshakeStatus = engine.getHandshakeStatus();
break;
}
switch (result.getStatus()) {
case OK :
myNetData.flip();
while (myNetData.hasRemaining()) {
socketChannel.write(myNetData);
}
break;
case BUFFER_OVERFLOW:
// Will occur if there is not enough space in myNetData buffer to write all the data that would be generated by the method wrap.
// Since myNetData is set to session's packet size we should not get to this point because SSLEngine is supposed
// to produce messages smaller or equal to that, but a general handling would be the following:
myNetData = enlargePacketBuffer(engine, myNetData);
break;
case BUFFER_UNDERFLOW:
throw new SSLException("Buffer underflow occurred after a wrap. I don't think we should ever get here.");
case CLOSED:
try {
myNetData.flip();
while (myNetData.hasRemaining()) {
socketChannel.write(myNetData);
}
// At this point the handshake status will probably be NEED_UNWRAP so we make sure that peerNetData is clear to read.
peerNetData.clear();
} catch (Exception e) {
log.error("Failed to send server's CLOSE message due to socket channel's failure.");
handshakeStatus = engine.getHandshakeStatus();
}
break;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
break;
case NEED_TASK:
Runnable task;
while ((task = engine.getDelegatedTask()) != null) {
executor.execute(task);
}
handshakeStatus = engine.getHandshakeStatus();
break;
default:
throw new IllegalStateException("Invalid SSL status: " + handshakeStatus);
}
}
log.debug("Handshake status: " + handshakeStatus);
return true;
}
protected ByteBuffer enlargePacketBuffer(SSLEngine engine, ByteBuffer buffer) {
return enlargeBuffer(buffer, engine.getSession().getPacketBufferSize());
}
protected ByteBuffer enlargeApplicationBuffer(SSLEngine engine, ByteBuffer buffer) {
return enlargeBuffer(buffer, engine.getSession().getApplicationBufferSize());
}
/**
* Compares <code>sessionProposedCapacity<code> with buffer's capacity. If buffer's capacity is smaller,
* returns a buffer with the proposed capacity. If it's equal or larger, returns a buffer
* with capacity twice the size of the initial one.
*
* @param buffer - the buffer to be enlarged.
* @param sessionProposedCapacity - the minimum size of the new buffer, proposed by {@link SSLSession}.
* @return A new buffer with a larger capacity.
*/
protected ByteBuffer enlargeBuffer(ByteBuffer buffer, int sessionProposedCapacity) {
if (sessionProposedCapacity > buffer.capacity()) {
buffer = ByteBuffer.allocate(sessionProposedCapacity);
} else {
buffer = ByteBuffer.allocate(buffer.capacity() * 2);
}
return buffer;
}
/**
* Handles {@link SSLEngineResult.Status#BUFFER_UNDERFLOW}. Will check if the buffer is already filled, and if there is no space problem
* will return the same buffer, so the client tries to read again. If the buffer is already filled will try to enlarge the buffer either to
* session's proposed size or to a larger capacity. A buffer underflow can happen only after an unwrap, so the buffer will always be a
* peerNetData buffer.
*
* @param buffer - will always be peerNetData buffer.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @return The same buffer if there is no space problem or a new buffer with the same data but more space.
*/
protected ByteBuffer handleBufferUnderflow(SSLEngine engine, ByteBuffer buffer) {
if (engine.getSession().getPacketBufferSize() < buffer.limit()) {
return buffer;
} else {
ByteBuffer replaceBuffer = enlargePacketBuffer(engine, buffer);
buffer.flip();
replaceBuffer.put(buffer);
return replaceBuffer;
}
}
/**
* This method should be called when this peer wants to explicitly close the connection
* or when a close message has arrived from the other peer, in order to provide an orderly shutdown.
* <p/>
* It first calls {@link SSLEngine#closeOutbound()} which prepares this peer to send its own close message and
* sets {@link SSLEngine} to the <code>NEED_WRAP</code> state. Then, it delegates the exchange of close messages
* to the handshake method and finally, it closes socket channel.
*
* @param socketChannel - the transport link used between the two peers.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @throws IOException if an I/O error occurs to the socket channel.
*/
protected void closeConnection(SocketChannel socketChannel, SSLEngine engine) throws IOException {
engine.closeOutbound();
doHandshake(socketChannel, engine);
socketChannel.close();
}
/**
* In addition to orderly shutdowns, an unorderly shutdown may occur, when the transport link (socket channel)
* is severed before close messages are exchanged. This may happen by getting an -1 or {@link IOException}
* when trying to read from the socket channel, or an {@link IOException} when trying to write to it.
* In both cases {@link SSLEngine#closeInbound()} should be called and then try to follow the standard procedure.
*
* @param socketChannel - the transport link used between the two peers.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @throws IOException if an I/O error occurs to the socket channel.
*/
protected void handleEndOfStream(SocketChannel socketChannel, SSLEngine engine) throws IOException {
try {
engine.closeInbound();
} catch (Exception e) {
log.error("This engine was forced to close inbound, without having received the proper SSL/TLS close notification message from the peer, due to end of stream.");
}
closeConnection(socketChannel, engine);
}
protected String peerAppDataAsString() {
return new String(Arrays.copyOf(peerAppData.array(), peerAppData.limit()));
}
}

View File

@ -0,0 +1,263 @@
package net.corda.node.amqp;
import net.corda.nodeapi.internal.protonwrapper.netty.SSLHelperKt;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.time.Duration;
import java.util.Iterator;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManagerFactory;
/**
* An SSL/TLS server, that will listen to a specific address and port and serve SSL/TLS connections
* compatible with the protocol it applies.
* <p/>
* After initialization {@link NioSslServer#start()} should be called so the server starts to listen to
* new connection requests. At this point, start is blocking, so, in order to be able to gracefully stop
* the server, a {@link Runnable} containing a server object should be created. This runnable should
* start the server in its run method and also provide a stop method, which will call {@link NioSslServer#stop()}.
* </p>
* NioSslServer makes use of Java NIO, and specifically listens to new connection requests with a {@link ServerSocketChannel}, which will
* create new {@link SocketChannel}s and a {@link Selector} which serves all the connections in one thread.
*
* @author <a href="mailto:alex.a.karnezis@gmail.com">Alex Karnezis</a>
*/
public class NioSslServer extends NioSslPeer {
private final Duration handshakeDelay;
/**
* Declares if the server is active to serve and create new connections.
*/
private boolean active;
/**
* The context will be initialized with a specific SSL/TLS protocol and will then be used
* to create {@link SSLEngine} classes for each new connection that arrives to the server.
*/
private final SSLContext context;
/**
* A part of Java NIO that will be used to serve all connections to the server in one thread.
*/
private final Selector selector;
/**
* Server is designed to apply an SSL/TLS protocol and listen to an IP address and port.
*
* @param hostAddress - the IP address this server will listen to.
* @param port - the port this server will listen to.
* @param handshakeDelay - if not [null] specifies for how long the handshake should be delayed
*/
public NioSslServer(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, String hostAddress, int port,
Duration handshakeDelay) throws Exception {
context = SSLHelperKt.createAndInitSslContext(keyManagerFactory, trustManagerFactory);
SSLSession dummySession = context.createSSLEngine().getSession();
myAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
myNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
peerAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
peerNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
dummySession.invalidate();
selector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(hostAddress, port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
this.handshakeDelay = handshakeDelay;
active = true;
}
/**
* Should be called in order the server to start listening to new connections.
* This method will run in a loop as long as the server is active. In order to stop the server
* you should use {@link NioSslServer#stop()} which will set it to inactive state
* and also wake up the listener, which may be in blocking select() state.
*/
public void start() throws Exception {
log.debug("Initialized and waiting for new connections...");
while (isActive()) {
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read((SocketChannel) key.channel(), (SSLEngine) key.attachment());
}
}
}
log.debug("Goodbye!");
}
/**
* Sets the server to an inactive state, in order to exit the reading loop in {@link NioSslServer#start()}
* and also wakes up the selector, which may be in select() blocking state.
*/
public void stop() {
log.debug("Will now close server...");
active = false;
executor.shutdown();
selector.wakeup();
}
/**
* Will be called after a new connection request arrives to the server. Creates the {@link SocketChannel} that will
* be used as the network layer link, and the {@link SSLEngine} that will encrypt and decrypt all the data
* that will be exchanged during the session with this specific client.
*
* @param key - the key dedicated to the {@link ServerSocketChannel} used by the server to listen to new connection requests.
*/
private void accept(SelectionKey key) throws Exception {
log.debug("New connection request!");
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
socketChannel.configureBlocking(false);
SSLEngine engine = context.createSSLEngine();
engine.setUseClientMode(false);
// Demand client to present its certificate
engine.setNeedClientAuth(true);
engine.beginHandshake();
if (handshakeDelay != null) {
log.info("Deliberately sleeping during handshake for: " + handshakeDelay);
Thread.sleep(handshakeDelay.toMillis());
}
if (doHandshake(socketChannel, engine)) {
socketChannel.register(selector, SelectionKey.OP_READ, engine);
} else {
socketChannel.close();
log.debug("Connection closed due to handshake failure.");
}
}
/**
* Will be called by the selector when the specific socket channel has data to be read.
* As soon as the server reads these data, it will call {@link NioSslServer#write(SocketChannel, SSLEngine, String)}
* to send back a trivial response.
*
* @param socketChannel - the transport link used between the two peers.
* @param engine - the engine used for encryption/decryption of the data exchanged between the two peers.
* @throws IOException if an I/O error occurs to the socket channel.
*/
@Override
protected void read(SocketChannel socketChannel, SSLEngine engine) throws IOException {
log.debug("About to read from a client...");
peerNetData.clear();
int bytesRead = socketChannel.read(peerNetData);
if (bytesRead > 0) {
peerNetData.flip();
while (peerNetData.hasRemaining()) {
peerAppData.clear();
SSLEngineResult result = engine.unwrap(peerNetData, peerAppData);
switch (result.getStatus()) {
case OK:
peerAppData.flip();
log.debug("Incoming message: " + peerAppDataAsString());
break;
case BUFFER_OVERFLOW:
peerAppData = enlargeApplicationBuffer(engine, peerAppData);
break;
case BUFFER_UNDERFLOW:
peerNetData = handleBufferUnderflow(engine, peerNetData);
break;
case CLOSED:
log.debug("Client wants to close connection...");
closeConnection(socketChannel, engine);
log.debug("Goodbye client!");
return;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
}
write(socketChannel, engine, "Hello! I am your server!");
} else if (bytesRead < 0) {
log.error("Received end of stream. Will try to close connection with client...");
handleEndOfStream(socketChannel, engine);
log.debug("Goodbye client!");
}
}
/**
* Will send a message back to a client.
*
* @param message - the message to be sent.
* @throws IOException if an I/O error occurs to the socket channel.
*/
@Override
protected void write(SocketChannel socketChannel, SSLEngine engine, String message) throws IOException {
log.debug("About to write to a client...");
myAppData.clear();
myAppData.put(message.getBytes());
myAppData.flip();
while (myAppData.hasRemaining()) {
// The loop has a meaning for (outgoing) messages larger than 16KB.
// Every wrap call will remove 16KB from the original message and send it to the remote peer.
myNetData.clear();
SSLEngineResult result = engine.wrap(myAppData, myNetData);
switch (result.getStatus()) {
case OK:
myNetData.flip();
while (myNetData.hasRemaining()) {
socketChannel.write(myNetData);
}
log.debug("Message sent to the client: " + message);
break;
case BUFFER_OVERFLOW:
myNetData = enlargePacketBuffer(engine, myNetData);
break;
case BUFFER_UNDERFLOW:
throw new SSLException("Buffer underflow occurred after a wrap. I don't think we should ever get here.");
case CLOSED:
closeConnection(socketChannel, engine);
return;
default:
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
}
}
}
/**
* Determines if the the server is active or not.
*
* @return if the server is active or not.
*/
public boolean isActive() {
return active;
}
}

View File

@ -0,0 +1,69 @@
package net.corda.node.amqp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import java.time.Duration;
/**
* This class provides a runnable that can be used to initialize a {@link NioSslServer} thread.
* <p/>
* Run starts the server, which will start listening to the configured IP address and port for
* new SSL/TLS connections and serve the ones already connected to it.
* <p/>
* Also a stop method is provided in order to gracefully close the server and stop the thread.
*
* @author <a href="mailto:alex.a.karnezis@gmail.com">Alex Karnezis</a>
*/
public class ServerThread implements AutoCloseable {
private final static Logger log = LoggerFactory.getLogger(ServerThread.class);
private static final long JOIN_TIMEOUT_MS = 10000;
private final NioSslServer server;
private Thread serverThread;
public ServerThread(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, int port) throws Exception {
this(keyManagerFactory, trustManagerFactory, port, null);
}
public ServerThread(KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory, int port, @Nullable Duration handshakeDelay) throws Exception {
server = new NioSslServer(keyManagerFactory, trustManagerFactory, "localhost", port, handshakeDelay);
}
public void start() {
Runnable serverRunnable = () -> {
try {
server.start();
} catch (Exception e) {
log.error("Exception starting server", e);
}
};
serverThread = new Thread(serverRunnable, this.getClass().getSimpleName() + "-ServerThread");
serverThread.start();
}
/**
* Should be called in order to gracefully stop the server.
*/
public void stop() throws InterruptedException {
server.stop();
serverThread.join(JOIN_TIMEOUT_MS);
}
public boolean isActive() {
return server.isActive();
}
@Override
public void close() throws Exception {
stop();
}
}

View File

@ -0,0 +1,208 @@
package net.corda.node.amqp
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.internal.div
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.init
import net.corda.nodeapi.internal.protonwrapper.netty.initialiseTrustStoreAndEnableCrlChecking
import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.driver.internal.incrementalPortAllocation
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory
import kotlin.test.assertFalse
import kotlin.test.assertTrue
/**
* This test verifies some edge case scenarios like handshake timeouts when [AMQPClient] connected to the server
*
* In order to have control over handshake internals a simple TLS server is created which may have a configurable handshake delay.
*/
@RunWith(Parameterized::class)
class AMQPClientSslErrorsTest(@Suppress("unused") private val iteration: Int) {
companion object {
private const val MAX_MESSAGE_SIZE = 10 * 1024
private val log = contextLogger()
@JvmStatic
@Parameterized.Parameters(name = "iteration = {0}")
fun iterations(): Iterable<Array<Int>> {
// It is possible to change this value to a greater number
// to ensure that the test is not flaking when executed on CI
val repsCount = 1
return (1..repsCount).map { arrayOf(it) }
}
}
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private val portAllocation = incrementalPortAllocation()
private lateinit var serverKeyManagerFactory: KeyManagerFactory
private lateinit var serverTrustManagerFactory: TrustManagerFactory
private lateinit var clientKeyManagerFactory: KeyManagerFactory
private lateinit var clientTrustManagerFactory: TrustManagerFactory
private lateinit var clientAmqpConfig: AMQPConfiguration
@Before
fun setup() {
setupServerCertificates()
setupClientCertificates()
}
private fun setupServerCertificates() {
val baseDirectory = temporaryFolder.root.toPath() / "server"
val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val serverConfig = mock<NodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(ALICE_NAME).whenever(it).myLegalName
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
}
serverConfig.configureWithDevSSLCertificate()
val keyStore = serverConfig.p2pSslOptions.keyStore.get()
val serverAmqpConfig = object : AMQPConfiguration {
override val keyStore = keyStore
override val trustStore = serverConfig.p2pSslOptions.trustStore.get()
override val revocationConfig = true.toRevocationConfig()
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
}
serverKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
serverTrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
serverKeyManagerFactory.init(keyStore)
serverTrustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(serverAmqpConfig.trustStore, serverAmqpConfig.revocationConfig))
}
private fun setupClientCertificates() {
val baseDirectory = temporaryFolder.root.toPath() / "client"
val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val clientConfig = mock<NodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(true).whenever(it).crlCheckSoftFail
}
clientConfig.configureWithDevSSLCertificate()
//val nodeCert = (signingCertificateStore to p2pSslConfiguration).recreateNodeCaAndTlsCertificates(nodeCrlDistPoint, tlsCrlDistPoint)
val keyStore = clientConfig.p2pSslOptions.keyStore.get()
clientAmqpConfig = object : AMQPConfiguration {
override val keyStore = keyStore
override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
override val sslHandshakeTimeout: Long = 3000
}
clientKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
clientTrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
clientKeyManagerFactory.init(keyStore)
clientTrustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(clientAmqpConfig.trustStore, clientAmqpConfig.revocationConfig))
}
@Test(timeout = 300_000)
fun trivialClientServerExchange() {
val serverPort = portAllocation.nextPort()
val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort).also { it.start() }
//System.setProperty("javax.net.debug", "all");
serverThread.use {
val client = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
client.connect()
client.write("Hello! I am a client!")
client.read()
client.shutdown()
val client2 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
val client3 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
val client4 = NioSslClient(clientKeyManagerFactory, clientTrustManagerFactory, "localhost", serverPort)
client2.connect()
client2.write("Hello! I am another client!")
client2.read()
client2.shutdown()
client3.connect()
client4.connect()
client3.write("Hello from client3!!!")
client4.write("Hello from client4!!!")
client3.read()
client4.read()
client3.shutdown()
client4.shutdown()
}
assertFalse(serverThread.isActive)
}
@Test(timeout = 300_000)
fun amqpClientServerConnect() {
val serverPort = portAllocation.nextPort()
val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort)
.also { it.start() }
serverThread.use {
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), clientAmqpConfig)
amqpClient.use {
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val clientConnect = clientConnected.get()
assertTrue(clientConnect.connected)
log.info("Confirmed connected")
}
}
assertFalse(serverThread.isActive)
}
@Test(timeout = 300_000)
fun amqpClientServerHandshakeTimeout() {
val serverPort = portAllocation.nextPort()
val serverThread = ServerThread(serverKeyManagerFactory, serverTrustManagerFactory, serverPort, 5.seconds)
.also { it.start() }
serverThread.use {
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), clientAmqpConfig)
amqpClient.use {
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val clientConnect = clientConnected.get()
assertFalse(clientConnect.connected)
// Not a badCert, but a timeout during handshake
assertFalse(clientConnect.badCert)
}
}
assertFalse(serverThread.isActive)
}
}

View File

@ -1,8 +1,10 @@
package net.corda.node.services.network
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.getOrThrow
@ -11,6 +13,7 @@ import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.testing.common.internal.addNotary
import net.corda.testing.common.internal.eventually
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.*
@ -74,7 +77,6 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
)
}
@Before
fun start() {
networkMapServer = NetworkMapServer(cacheTimeout, portAllocation.nextHostAndPort())
@ -142,6 +144,102 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
}
}
@Test(timeout = 300_000)
fun `Can hotload parameters if the notary changes`() {
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
notarySpecs = emptyList()
) {
val notary: Party = TestIdentity.fresh("test notary").party
val oldParams = networkMapServer.networkParameters
val paramsWithNewNotary = oldParams.copy(
epoch = 3,
modifiedTime = Instant.ofEpochMilli(random63BitValue())).addNotary(notary)
val alice = startNodeAndRunFlagDay(paramsWithNewNotary)
eventually { assertEquals(paramsWithNewNotary, alice.rpc.networkParameters) }
}
}
@Test(timeout = 300_000)
fun `If only the notary changes but parameters were not accepted, the node will still shut down on the flag day`() {
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
notarySpecs = emptyList()
) {
val notary: Party = TestIdentity.fresh("test notary").party
val oldParams = networkMapServer.networkParameters
val paramsWithNewNotary = oldParams.copy(
epoch = 3,
modifiedTime = Instant.ofEpochMilli(random63BitValue())).addNotary(notary)
val alice = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow() as NodeHandleInternal
networkMapServer.scheduleParametersUpdate(paramsWithNewNotary, "Next parameters", Instant.ofEpochMilli(random63BitValue()))
// Wait for network map client to poll for the next update.
Thread.sleep(cacheTimeout.toMillis() * 2)
networkMapServer.advertiseNewParameters()
eventually { assertThatThrownBy { alice.rpc.networkParameters }.hasMessageContaining("Connection failure detected") }
}
}
@Test(timeout = 300_000)
fun `Can not hotload parameters if non-hotloadable parameter changes and the node will shut down`() {
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
notarySpecs = emptyList()
) {
val oldParams = networkMapServer.networkParameters
val paramsWithUpdatedMaxMessageSize = oldParams.copy(
epoch = 3,
modifiedTime = Instant.ofEpochMilli(random63BitValue()),
maxMessageSize = oldParams.maxMessageSize + 1)
val alice = startNodeAndRunFlagDay(paramsWithUpdatedMaxMessageSize)
eventually { assertThatThrownBy { alice.rpc.networkParameters }.hasMessageContaining("Connection failure detected") }
}
}
@Test(timeout = 300_000)
fun `Can not hotload parameters if notary and a non-hotloadable parameter changes and the node will shut down`() {
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
notarySpecs = emptyList()
) {
val oldParams = networkMapServer.networkParameters
val notary: Party = TestIdentity.fresh("test notary").party
val paramsWithUpdatedMaxMessageSizeAndNotary = oldParams.copy(
epoch = 3,
modifiedTime = Instant.ofEpochMilli(random63BitValue()),
maxMessageSize = oldParams.maxMessageSize + 1).addNotary(notary)
val alice = startNodeAndRunFlagDay(paramsWithUpdatedMaxMessageSizeAndNotary)
eventually { assertThatThrownBy { alice.rpc.networkParameters }.hasMessageContaining("Connection failure detected") }
}
}
private fun DriverDSLImpl.startNodeAndRunFlagDay(newParams: NetworkParameters): NodeHandleInternal {
val alice = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow() as NodeHandleInternal
val nextHash = newParams.serialize().hash
networkMapServer.scheduleParametersUpdate(newParams, "Next parameters", Instant.ofEpochMilli(random63BitValue()))
// Wait for network map client to poll for the next update.
Thread.sleep(cacheTimeout.toMillis() * 2)
alice.rpc.acceptNewNetworkParameters(nextHash)
assertEquals(nextHash, networkMapServer.latestParametersAccepted(alice.nodeInfo.legalIdentities.first().owningKey))
assertEquals(networkMapServer.networkParameters, alice.rpc.networkParameters)
networkMapServer.advertiseNewParameters()
return alice
}
@Test(timeout=300_000)
fun `nodes process additions and removals from the network map correctly (and also download the network parameters)`() {
internalDriver(

View File

@ -33,13 +33,19 @@ class FlowVersioningTest : NodeBasedTest() {
private class PretendInitiatingCoreFlow(val initiatedParty: Party) : FlowLogic<Pair<Int, Int>>() {
@Suspendable
override fun call(): Pair<Int, Int> {
// Execute receive() outside of the Pair constructor to avoid Kotlin/Quasar instrumentation bug.
val session = initiateFlow(initiatedParty)
val alicePlatformVersionAccordingToBob = session.receive<Int>().unwrap { it }
return Pair(
alicePlatformVersionAccordingToBob,
session.getCounterpartyFlowInfo().flowVersion
)
return try {
// Get counterparty flow info before we receive Alice's data, to ensure the flow is still open
val bobPlatformVersionAccordingToAlice = session.getCounterpartyFlowInfo().flowVersion
// Execute receive() outside of the Pair constructor to avoid Kotlin/Quasar instrumentation bug.
val alicePlatformVersionAccordingToBob = session.receive<Int>().unwrap { it }
Pair(
alicePlatformVersionAccordingToBob,
bobPlatformVersionAccordingToAlice
)
} finally {
session.close()
}
}
}

View File

@ -109,6 +109,8 @@ import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapClient
import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.network.NetworkParameterUpdateListener
import net.corda.node.services.network.NetworkParametersHotloader
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.AbstractPartyDescriptor
@ -524,6 +526,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
Node.printBasicNodeInfo("CorDapp schemas synchronised")
}
@Suppress("ComplexMethod")
open fun start(): S {
check(started == null) { "Node has already been started" }
@ -549,7 +552,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
startShell()
networkMapClient?.start(trustRoot)
val (netParams, signedNetParams) = NetworkParametersReader(trustRoot, networkMapClient, configuration.baseDirectory).read()
val networkParametersReader = NetworkParametersReader(trustRoot, networkMapClient, configuration.baseDirectory)
val (netParams, signedNetParams) = networkParametersReader.read()
log.info("Loaded network parameters: $netParams")
check(netParams.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion"
@ -570,13 +574,27 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val (nodeInfo, signedNodeInfo) = nodeInfoAndSigned
identityService.ourNames = nodeInfo.legalIdentities.map { it.name }.toSet()
services.start(nodeInfo, netParams)
val networkParametersHotloader = if (networkMapClient == null) {
null
} else {
NetworkParametersHotloader(networkMapClient, trustRoot, netParams, networkParametersReader, networkParametersStorage).also {
it.addNotaryUpdateListener(networkMapCache)
it.addNotaryUpdateListener(identityService)
it.addNetworkParametersChangedListeners(services)
it.addNetworkParametersChangedListeners(networkMapUpdater)
}
}
networkMapUpdater.start(
trustRoot,
signedNetParams.raw.hash,
signedNodeInfo,
netParams,
keyManagementService,
configuration.networkParameterAcceptanceSettings!!)
configuration.networkParameterAcceptanceSettings!!,
networkParametersHotloader)
try {
startMessagingService(rpcOps, nodeInfo, myNotaryIdentity, netParams)
} catch (e: Exception) {
@ -1221,7 +1239,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
}
inner class ServiceHubInternalImpl : SingletonSerializeAsToken(), ServiceHubInternal, ServicesForResolution by servicesForResolution {
inner class ServiceHubInternalImpl : SingletonSerializeAsToken(), ServiceHubInternal, ServicesForResolution by servicesForResolution, NetworkParameterUpdateListener {
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database)
override val identityService: IdentityService get() = this@AbstractNode.identityService
@ -1254,6 +1272,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache get() = this@AbstractNode.attachmentsClassLoaderCache
@Volatile
private lateinit var _networkParameters: NetworkParameters
override val networkParameters: NetworkParameters get() = _networkParameters
@ -1340,6 +1359,10 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val ledgerTransaction = servicesForResolution.specialise(ltx)
return verifierFactoryService.apply(ledgerTransaction)
}
override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
this._networkParameters = networkParameters
}
}
}

View File

@ -12,6 +12,7 @@ import net.corda.core.internal.CertRole
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.hash
import net.corda.core.internal.toSet
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
@ -19,6 +20,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.keys.BasicHSMKeyManagementService
import net.corda.node.services.network.NotaryUpdateListener
import net.corda.node.services.persistence.PublicKeyHashToExternalId
import net.corda.node.services.persistence.WritablePublicKeyToOwningIdentityCache
import net.corda.node.utilities.AppendOnlyPersistentMap
@ -53,7 +55,8 @@ import kotlin.streams.toList
* cached for efficient lookup.
*/
@ThreadSafe
class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSerializeAsToken(), IdentityServiceInternal {
@Suppress("TooManyFunctions")
class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSerializeAsToken(), IdentityServiceInternal, NotaryUpdateListener {
companion object {
private val log = contextLogger()
@ -197,7 +200,8 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
override val trustAnchor: TrustAnchor get() = _trustAnchor
/** Stores notary identities obtained from the network parameters, for which we don't need to perform a database lookup. */
private val notaryIdentityCache = HashSet<Party>()
@Volatile
private var notaryIdentityCache = HashSet<Party>()
// CordaPersistence is not a c'tor parameter to work around the cyclic dependency
lateinit var database: CordaPersistence
@ -453,4 +457,8 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
keys
}
}
override fun onNewNotaryList(notaries: List<NotaryInfo>) {
notaryIdentityCache = HashSet(notaries.map { it.identity })
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.network
import com.google.common.util.concurrent.MoreExecutors
import net.corda.cliutils.ExitCodes
import net.corda.core.CordaRuntimeException
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
@ -62,7 +63,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val baseDirectory: Path,
private val extraNetworkMapKeys: List<UUID>,
private val networkParametersStorage: NetworkParametersStorage
) : AutoCloseable {
) : AutoCloseable, NetworkParameterUpdateListener {
companion object {
private val logger = contextLogger()
private val defaultRetryInterval = 1.minutes
@ -77,12 +78,15 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcherSubscription = AtomicReference<Subscription?>()
private var autoAcceptNetworkParameters: Boolean = true
private lateinit var trustRoot: X509Certificate
@Volatile
private lateinit var currentParametersHash: SecureHash
private lateinit var ourNodeInfo: SignedNodeInfo
private lateinit var ourNodeInfoHash: SecureHash
private lateinit var networkParameters: NetworkParameters
private lateinit var keyManagementService: KeyManagementService
private lateinit var excludedAutoAcceptNetworkParameters: Set<String>
private var networkParametersHotloader: NetworkParametersHotloader? = null
override fun close() {
fileWatcherSubscription.updateAndGet { subscription ->
@ -95,13 +99,15 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
}
MoreExecutors.shutdownAndAwaitTermination(networkMapPoller, 50, TimeUnit.SECONDS)
}
@Suppress("LongParameterList")
fun start(trustRoot: X509Certificate,
currentParametersHash: SecureHash,
ourNodeInfo: SignedNodeInfo,
networkParameters: NetworkParameters,
keyManagementService: KeyManagementService,
networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings) {
networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings,
networkParametersHotloader: NetworkParametersHotloader?
) {
fileWatcherSubscription.updateAndGet { subscription ->
require(subscription == null) { "Should not call this method twice" }
this.trustRoot = trustRoot
@ -112,6 +118,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
this.keyManagementService = keyManagementService
this.autoAcceptNetworkParameters = networkParameterAcceptanceSettings.autoAcceptEnabled
this.excludedAutoAcceptNetworkParameters = networkParameterAcceptanceSettings.excludedAutoAcceptableParameters
this.networkParametersHotloader = networkParametersHotloader
val autoAcceptNetworkParametersNames = autoAcceptablePropertyNames - excludedAutoAcceptNetworkParameters
if (autoAcceptNetworkParameters && autoAcceptNetworkParametersNames.isNotEmpty()) {
@ -180,7 +188,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
val additionalHashes = getPrivateNetworkNodeHashes(version)
val allHashesFromNetworkMap = (globalNetworkMap.nodeInfoHashes + additionalHashes).toSet()
if (currentParametersHash != globalNetworkMap.networkParameterHash) {
exitOnParametersMismatch(globalNetworkMap)
hotloadOrExitOnParametersMismatch(globalNetworkMap)
}
// Calculate any nodes that are now gone and remove _only_ them from the cache
// NOTE: We won't remove them until after the add/update cycle as only then will we definitely know which nodes are no longer
@ -276,22 +284,26 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
}
}
private fun exitOnParametersMismatch(networkMap: NetworkMap) {
private fun hotloadOrExitOnParametersMismatch(networkMap: NetworkMap) {
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
val exitCode = if (acceptedHash == networkMap.networkParameterHash) {
logger.info("Flag day occurred. Network map switched to the new network parameters: " +
"${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
0
} else {
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
val newParameterHash = networkMap.networkParameterHash
val nodeAcceptedNewParameters = updatesFile.exists() && newParameterHash == updatesFile.readObject<SignedNetworkParameters>().raw.hash
if (!nodeAcceptedNewParameters) {
logger.error(
"""Node is using network parameters with hash $currentParametersHash but the network map is advertising ${networkMap.networkParameterHash}.
To resolve this mismatch, and move to the current parameters, delete the $NETWORK_PARAMS_FILE_NAME file from the node's directory and restart.
The node will shutdown now.""")
1
exitProcess(ExitCodes.FAILURE)
}
exitProcess(exitCode)
val hotloadSucceeded = networkParametersHotloader!!.attemptHotload(newParameterHash)
if (!hotloadSucceeded) {
logger.info("Flag day occurred. Network map switched to the new network parameters: " +
"${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
exitProcess(ExitCodes.SUCCESS)
}
currentParametersHash = newParameterHash
}
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
@ -340,6 +352,10 @@ The node will shutdown now.""")
throw OutdatedNetworkParameterHashException(parametersHash, newParametersHash)
}
}
override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
this.networkParameters = networkParameters
}
}
private val memberPropertyPartition = NetworkParameters::class.declaredMemberProperties.partition { it.isAutoAcceptable() }
@ -360,8 +376,8 @@ internal fun NetworkParameters.canAutoAccept(newNetworkParameters: NetworkParame
private fun KProperty1<out NetworkParameters, Any?>.isAutoAcceptable(): Boolean = findAnnotation<AutoAcceptable>() != null
private fun NetworkParameters.valueChanged(newNetworkParameters: NetworkParameters, getter: Method?): Boolean {
internal fun NetworkParameters.valueChanged(newNetworkParameters: NetworkParameters, getter: Method?): Boolean {
val propertyValue = getter?.invoke(this)
val newPropertyValue = getter?.invoke(newNetworkParameters)
return propertyValue != newPropertyValue
}
}

View File

@ -0,0 +1,11 @@
package net.corda.node.services.network
import net.corda.core.node.NetworkParameters
/**
* When network parameters change on a flag day, onNewNetworkParameters will be invoked with the new parameters.
* Used inside {@link net.corda.node.services.network.NetworkParametersUpdater}
*/
interface NetworkParameterUpdateListener {
fun onNewNetworkParameters(networkParameters: NetworkParameters)
}

View File

@ -0,0 +1,88 @@
package net.corda.node.services.network
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.NetworkParametersStorage
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.NetworkParametersReader
import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert
import java.security.cert.X509Certificate
import kotlin.reflect.full.declaredMemberProperties
import kotlin.reflect.jvm.javaGetter
/**
* This class is responsible for hotloading new network parameters or shut down the node if it's not possible.
* Currently only hotloading notary changes are supported.
*/
class NetworkParametersHotloader(private val networkMapClient: NetworkMapClient,
private val trustRoot: X509Certificate,
@Volatile private var networkParameters: NetworkParameters,
private val networkParametersReader: NetworkParametersReader,
private val networkParametersStorage: NetworkParametersStorage) {
companion object {
private val logger = contextLogger()
private val alwaysHotloadable = listOf(NetworkParameters::epoch, NetworkParameters::modifiedTime)
}
private val networkParameterUpdateListeners = mutableListOf<NetworkParameterUpdateListener>()
private val notaryUpdateListeners = mutableListOf<NotaryUpdateListener>()
fun addNetworkParametersChangedListeners(listener: NetworkParameterUpdateListener) {
networkParameterUpdateListeners.add(listener)
}
fun addNotaryUpdateListener(listener: NotaryUpdateListener) {
notaryUpdateListeners.add(listener)
}
private fun notifyListenersFor(notaries: List<NotaryInfo>) = notaryUpdateListeners.forEach { it.onNewNotaryList(notaries) }
private fun notifyListenersFor(networkParameters: NetworkParameters) = networkParameterUpdateListeners.forEach { it.onNewNetworkParameters(networkParameters) }
fun attemptHotload(newNetworkParameterHash: SecureHash): Boolean {
val newSignedNetParams = networkMapClient.getNetworkParameters(newNetworkParameterHash)
val newNetParams = newSignedNetParams.verifiedNetworkParametersCert(trustRoot)
if (canHotload(newNetParams)) {
logger.info("All changed parameters are hotloadable")
hotloadParameters(newNetParams)
return true
} else {
return false
}
}
/**
* Ignoring always hotloadable properties (epoch, modifiedTime) return true if the notary is the only property that is different in the new network parameters
*/
private fun canHotload(newNetworkParameters: NetworkParameters): Boolean {
val propertiesChanged = NetworkParameters::class.declaredMemberProperties
.minus(alwaysHotloadable)
.filter { networkParameters.valueChanged(newNetworkParameters, it.javaGetter) }
logger.info("Updated NetworkParameters properties: $propertiesChanged")
val noPropertiesChanged = propertiesChanged.isEmpty()
val onlyNotariesChanged = propertiesChanged == listOf(NetworkParameters::notaries)
return when {
noPropertiesChanged -> true
onlyNotariesChanged -> true
else -> false
}
}
/**
* Update local networkParameters and currentParametersHash with new values.
* Notify all listeners for network parameter changes
*/
private fun hotloadParameters(newNetworkParameters: NetworkParameters) {
networkParameters = newNetworkParameters
val networkParametersAndSigned = networkParametersReader.read()
networkParametersStorage.setCurrentParameters(networkParametersAndSigned.signed, trustRoot)
notifyListenersFor(newNetworkParameters)
notifyListenersFor(newNetworkParameters.notaries)
}
}

View File

@ -0,0 +1,11 @@
package net.corda.node.services.network
import net.corda.core.node.NotaryInfo
/**
* When notaries inside network parameters change on a flag day, onNewNotaryList will be invoked with the new notary list.
* Used inside {@link net.corda.node.services.network.NetworkParametersUpdater}
*/
interface NotaryUpdateListener {
fun onNewNotaryList(notaries: List<NotaryInfo>)
}

View File

@ -38,9 +38,10 @@ import javax.persistence.PersistenceException
/** Database-based network map cache. */
@ThreadSafe
@Suppress("TooManyFunctions")
open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
private val database: CordaPersistence,
private val identityService: IdentityService) : NetworkMapCacheInternal, SingletonSerializeAsToken() {
private val identityService: IdentityService) : NetworkMapCacheInternal, SingletonSerializeAsToken(), NotaryUpdateListener {
companion object {
private val logger = contextLogger()
@ -53,6 +54,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
override val nodeReady: OpenFuture<Void?> = openFuture()
@Volatile
private lateinit var notaries: List<NotaryInfo>
override val notaryIdentities: List<Party> get() = notaries.map { it.identity }
@ -386,4 +388,8 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
for (nodeInfo in result) session.remove(nodeInfo)
}
}
override fun onNewNotaryList(notaries: List<NotaryInfo>) {
this.notaries = notaries
}
}

View File

@ -139,7 +139,7 @@ sealed class Event {
data class AsyncOperationCompletion(val returnValue: Any?) : Event()
/**
* Signals the faiure of a [FlowAsyncOperation].
* Signals the failure of a [FlowAsyncOperation].
*
* Scheduling is triggered by the service that completes the future returned by the async operation.
*
@ -179,6 +179,13 @@ sealed class Event {
override fun toString() = "WakeUpSleepyFlow"
}
/**
* Terminate the specified [sessions], removing them from in-memory datastructures.
*
* @param sessions The sessions to terminate
*/
data class TerminateSessions(val sessions: Set<SessionId>) : Event()
/**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list.

View File

@ -41,47 +41,18 @@ class StartedFlowTransition(
continuation = FlowContinuation.Throw(errorsToThrow[0])
)
}
val sessionsToBeTerminated = findSessionsToBeTerminated(startingState)
// if there are sessions to be closed, we close them as part of this transition and normal processing will continue on the next transition.
return if (sessionsToBeTerminated.isNotEmpty()) {
terminateSessions(sessionsToBeTerminated)
} else {
when (flowIORequest) {
is FlowIORequest.Send -> sendTransition(flowIORequest)
is FlowIORequest.Receive -> receiveTransition(flowIORequest)
is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
}
}
}
private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
sessionId to sessionState
} else {
null
}
}.toMap()
}
private fun terminateSessions(sessionsToBeTerminated: SessionMap): TransitionResult {
return builder {
val sessionsToRemove = sessionsToBeTerminated.keys
val newCheckpoint = currentState.checkpoint.removeSessions(sessionsToRemove)
.removeSessionsToBeClosed(sessionsToRemove)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.RemoveSessionBindings(sessionsToRemove))
actions.add(Action.ScheduleEvent(Event.DoRemainingWork))
FlowContinuation.ProcessEvents
}
return when (flowIORequest) {
is FlowIORequest.Send -> sendTransition(flowIORequest)
is FlowIORequest.Receive -> receiveTransition(flowIORequest)
is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
}.let { scheduleTerminateSessionsIfRequired(it) }
}
private fun waitForSessionConfirmationsTransition(): TransitionResult {
@ -158,6 +129,7 @@ class StartedFlowTransition(
}
}
@Suppress("TooGenericExceptionCaught")
private fun sendAndReceiveTransition(flowIORequest: FlowIORequest.SendAndReceive): TransitionResult {
val sessionIdToMessage = LinkedHashMap<SessionId, SerializedBytes<Any>>()
val sessionIdToSession = LinkedHashMap<SessionId, FlowSessionImpl>()
@ -171,18 +143,23 @@ class StartedFlowTransition(
if (isErrored()) {
FlowContinuation.ProcessEvents
} else {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
// We don't yet have the messages, change the suspension to be on Receive
val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet())
currentState = currentState.copy(
try {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
// We don't yet have the messages, change the suspension to be on Receive
val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet())
currentState = currentState.copy(
checkpoint = currentState.checkpoint.copy(
flowState = FlowState.Started(newIoRequest, started.frozenFiber)
flowState = FlowState.Started(newIoRequest, started.frozenFiber)
)
)
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
)
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
}
} catch (t: Throwable) {
// E.g. A session end message received while expecting a data session message
resumeFlowLogic(t)
}
}
}
@ -216,6 +193,7 @@ class StartedFlowTransition(
}
}
@Suppress("TooGenericExceptionCaught")
private fun receiveTransition(flowIORequest: FlowIORequest.Receive): TransitionResult {
return builder {
val sessionIdToSession = LinkedHashMap<SessionId, FlowSessionImpl>()
@ -224,11 +202,16 @@ class StartedFlowTransition(
}
// send initialises to uninitialised sessions
sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys)
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
try {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
}
} catch (t: Throwable) {
// E.g. A session end message received while expecting a data session message
resumeFlowLogic(t)
}
}
}
@ -253,6 +236,8 @@ class StartedFlowTransition(
val messages: Map<SessionId, SerializedBytes<Any>>,
val newSessionMap: SessionMap
)
@Suppress("ComplexMethod", "NestedBlockDepth")
private fun pollSessionMessages(sessions: SessionMap, sessionIds: Set<SessionId>): PollResult? {
val newSessionMessages = LinkedHashMap(sessions)
val resultMessages = LinkedHashMap<SessionId, SerializedBytes<Any>>()
@ -267,7 +252,11 @@ class StartedFlowTransition(
} else {
newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toList())
// at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message.
resultMessages[sessionId] = (messages[0] as DataSessionMessage).payload
resultMessages[sessionId] = if (messages[0] is EndSessionMessage) {
throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?")
} else {
(messages[0] as DataSessionMessage).payload
}
}
}
else -> {
@ -537,4 +526,25 @@ class StartedFlowTransition(
private fun executeForceCheckpoint(): TransitionResult {
return builder { resumeFlowLogic(Unit) }
}
private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult {
// If there are sessions to be closed, close them on a following transition
val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState)
return if (sessionsToBeTerminated.isNotEmpty()) {
transition.copy(actions = transition.actions + Action.ScheduleEvent(Event.TerminateSessions(sessionsToBeTerminated.keys)))
} else {
transition
}
}
private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
sessionId to sessionState
} else {
null
}
}.toMap()
}
}

View File

@ -62,6 +62,7 @@ class TopLevelTransition(
is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition()
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
}
}
@ -366,4 +367,16 @@ class TopLevelTransition(
resumeFlowLogic(Unit)
}
}
private fun terminateSessionsTransition(event: Event.TerminateSessions): TransitionResult {
return builder {
val sessions = event.sessions
val newCheckpoint = currentState.checkpoint
.removeSessions(sessions)
.removeSessionsToBeClosed(sessions)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.RemoveSessionBindings(sessions))
FlowContinuation.ProcessEvents
}
}
}

View File

@ -6,6 +6,8 @@ import net.corda.core.context.InvocationOrigin
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByService
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.packageName
import net.corda.core.node.AppServiceHub
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.CordaService
@ -16,12 +18,20 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.cordapp.DummyRPCFlow
import net.corda.testing.core.BOC_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.vault.DummyLinearStateSchemaV1
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.MockServices
import net.corda.testing.node.StartedMockNode
import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -100,6 +110,22 @@ class CordaServiceTest {
nodeA.services.cordaService(EntityManagerService::class.java)
}
@Test(timeout=300_000)
fun `MockServices when initialized with package name not on classpath throws ClassNotFoundException`() {
val cordappPackages = listOf(
"com.r3.corda.sdk.tokens.money",
"net.corda.finance.contracts",
CashSchemaV1::class.packageName,
DummyLinearStateSchemaV1::class.packageName)
val bankOfCorda = TestIdentity(BOC_NAME)
val dummyCashIssuer = TestIdentity(CordaX500Name("Snake Oil Issuer", "London", "GB"), 10)
val dummyNotary = TestIdentity(DUMMY_NOTARY_NAME, 20)
val identityService = makeTestIdentityService(dummyNotary.identity)
Assertions.assertThatThrownBy { MockServices(cordappPackages, dummyNotary, identityService, dummyCashIssuer.keyPair, bankOfCorda.keyPair) }
.isInstanceOf(ClassNotFoundException::class.java).hasMessage("Could not create jar file as the given package is not found on the classpath: com.r3.corda.sdk.tokens.money")
}
@StartableByService
class DummyServiceFlow : FlowLogic<InvocationContext>() {
companion object {

View File

@ -78,7 +78,6 @@ class NetworkMapUpdaterTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val cacheExpiryMs = 1000
private val privateNetUUID = UUID.randomUUID()
private val fs = Jimfs.newFileSystem(unix())
@ -120,12 +119,13 @@ class NetworkMapUpdaterTest {
networkParameters: NetworkParameters = server.networkParameters,
autoAcceptNetworkParameters: Boolean = true,
excludedAutoAcceptNetworkParameters: Set<String> = emptySet()) {
updater!!.start(DEV_ROOT_CA.certificate,
server.networkParameters.serialize().hash,
ourNodeInfo,
networkParameters,
MockKeyManagementService(makeTestIdentityService(), ourKeyPair),
NetworkParameterAcceptanceSettings(autoAcceptNetworkParameters, excludedAutoAcceptNetworkParameters))
NetworkParameterAcceptanceSettings(autoAcceptNetworkParameters, excludedAutoAcceptNetworkParameters), null)
}
@Test(timeout=300_000)

View File

@ -0,0 +1,125 @@
package net.corda.node.services.network
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.never
import com.nhaarman.mockito_kotlin.verify
import net.corda.core.identity.Party
import net.corda.core.internal.NetworkParametersStorage
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.serialization.serialize
import net.corda.coretesting.internal.DEV_ROOT_CA
import net.corda.node.internal.NetworkParametersReader
import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.testing.common.internal.addNotary
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import org.junit.Assert
import org.junit.Rule
import org.junit.Test
import org.mockito.Mockito
class NetworkParametersHotloaderTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa()
private val trustRoot = DEV_ROOT_CA.certificate
private val originalNetworkParameters = testNetworkParameters()
private val notary: Party = TestIdentity.fresh("test notary").party
private val networkParametersWithNotary = originalNetworkParameters.addNotary(notary)
private val networkParametersStorage = Mockito.mock(NetworkParametersStorage::class.java)
@Test(timeout = 300_000)
fun `can hotload if notary changes`() {
`can hotload`(networkParametersWithNotary)
}
@Test(timeout = 300_000)
fun `can not hotload if notary changes but another non-hotloadable property also changes`() {
val newnetParamsWithNewNotaryAndMaxMsgSize = networkParametersWithNotary.copy(maxMessageSize = networkParametersWithNotary.maxMessageSize + 1)
`can not hotload`(newnetParamsWithNewNotaryAndMaxMsgSize)
}
@Test(timeout = 300_000)
fun `can hotload if only always hotloadable properties change`() {
val newParametersWithAlwaysHotloadableProperties = originalNetworkParameters.copy(epoch = originalNetworkParameters.epoch + 1, modifiedTime = originalNetworkParameters.modifiedTime.plusSeconds(60))
`can hotload`(newParametersWithAlwaysHotloadableProperties)
}
@Test(timeout = 300_000)
fun `can not hotload if maxMessageSize changes`() {
val parametersWithNewMaxMessageSize = originalNetworkParameters.copy(maxMessageSize = originalNetworkParameters.maxMessageSize + 1)
`can not hotload`(parametersWithNewMaxMessageSize)
}
@Test(timeout = 300_000)
fun `can not hotload if maxTransactionSize changes`() {
val parametersWithNewMaxTransactionSize = originalNetworkParameters.copy(maxTransactionSize = originalNetworkParameters.maxMessageSize + 1)
`can not hotload`(parametersWithNewMaxTransactionSize)
}
@Test(timeout = 300_000)
fun `can not hotload if minimumPlatformVersion changes`() {
val parametersWithNewMinimumPlatformVersion = originalNetworkParameters.copy(minimumPlatformVersion = originalNetworkParameters.minimumPlatformVersion + 1)
`can not hotload`(parametersWithNewMinimumPlatformVersion)
}
private fun `can hotload`(newNetworkParameters: NetworkParameters) {
val notaryUpdateListener = Mockito.spy(object : NotaryUpdateListener {
override fun onNewNotaryList(notaries: List<NotaryInfo>) {
}
})
val networkParametersChangedListener = Mockito.spy(object : NetworkParameterUpdateListener {
override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
}
})
val networkParametersHotloader = createHotloaderWithMockedServices(newNetworkParameters).also {
it.addNotaryUpdateListener(notaryUpdateListener)
it.addNetworkParametersChangedListeners(networkParametersChangedListener)
}
Assert.assertTrue(networkParametersHotloader.attemptHotload(newNetworkParameters.serialize().hash))
verify(notaryUpdateListener).onNewNotaryList(newNetworkParameters.notaries)
verify(networkParametersChangedListener).onNewNetworkParameters(newNetworkParameters)
}
private fun `can not hotload`(newNetworkParameters: NetworkParameters) {
val notaryUpdateListener = Mockito.spy(object : NotaryUpdateListener {
override fun onNewNotaryList(notaries: List<NotaryInfo>) {
}
})
val networkParametersChangedListener = Mockito.spy(object : NetworkParameterUpdateListener {
override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
}
})
val networkParametersHotloader = createHotloaderWithMockedServices(newNetworkParameters).also {
it.addNotaryUpdateListener(notaryUpdateListener)
it.addNetworkParametersChangedListeners(networkParametersChangedListener)
}
Assert.assertFalse(networkParametersHotloader.attemptHotload(newNetworkParameters.serialize().hash))
verify(notaryUpdateListener, never()).onNewNotaryList(any());
verify(networkParametersChangedListener, never()).onNewNetworkParameters(any());
}
private fun createHotloaderWithMockedServices(newNetworkParameters: NetworkParameters): NetworkParametersHotloader {
val signedNetworkParameters = networkMapCertAndKeyPair.sign(newNetworkParameters)
val networkMapClient = Mockito.mock(NetworkMapClient::class.java)
Mockito.`when`(networkMapClient.getNetworkParameters(newNetworkParameters.serialize().hash)).thenReturn(signedNetworkParameters)
val networkParametersReader = Mockito.mock(NetworkParametersReader::class.java)
Mockito.`when`(networkParametersReader.read())
.thenReturn(NetworkParametersReader.NetworkParametersAndSigned(signedNetworkParameters, trustRoot))
return NetworkParametersHotloader(networkMapClient, trustRoot, originalNetworkParameters, networkParametersReader, networkParametersStorage)
}
}

View File

@ -97,7 +97,7 @@ class RetryFlowMockTest {
}
@Test(timeout=300_000)
fun `Restart does not set senderUUID`() {
fun `Restart does not set senderUUID and early end session message does not hang receiving flow`() {
val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
val partyB = nodeB.info.legalIdentities.first()
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {

View File

@ -37,11 +37,6 @@ data class CustomCordapp(
val signingInfo: SigningInfo? = null,
override val config: Map<String, Any> = emptyMap()
) : TestCordappInternal() {
init {
require(packages.isNotEmpty() || classes.isNotEmpty() || fixups.isNotEmpty()) {
"At least one package or class must be specified"
}
}
override val jarFile: Path get() = getJarFile(this)
@ -55,7 +50,7 @@ data class CustomCordapp(
@VisibleForTesting
internal fun packageAsJar(file: Path) {
val classGraph = ClassGraph()
if (packages.isNotEmpty()) {
if(packages.isNotEmpty()){
classGraph.whitelistPaths(*packages.map { it.replace('.', '/') }.toTypedArray())
}
if (classes.isNotEmpty()) {
@ -78,6 +73,10 @@ data class CustomCordapp(
}
}
if (scanResult.allResources.isEmpty()){
throw ClassNotFoundException("Could not create jar file as the given package is not found on the classpath: ${packages.toList()[0]}")
}
// The same resource may be found in different locations (this will happen when running from gradle) so just
// pick the first one found.
scanResult.allResources.asMap().forEach { path, resourceList ->
@ -178,8 +177,8 @@ data class CustomCordapp(
val jarFile = cordappsDirectory.createDirectories() / filename
if (it.fixups.isNotEmpty()) {
it.createFixupJar(jarFile)
} else {
it.packageAsJar(jarFile)
} else if(it.packages.isNotEmpty() || it.classes.isNotEmpty() || it.fixups.isNotEmpty()) {
it.packageAsJar(jarFile)
}
it.signJar(jarFile)
logger.debug { "$it packaged into $jarFile" }

View File

@ -30,14 +30,17 @@ abstract class TestCordappInternal : TestCordapp() {
// Precedence is given to node-specific CorDapps
val allCordapps = nodeSpecificCordapps + generalCordapps.filter { it.withOnlyJarContents() !in nodeSpecificCordappsWithoutMeta }
// Ignore any duplicate jar files
val jarToCordapp = allCordapps.associateBy { it.jarFile }
val jarToCordapp = allCordapps.filter {
it !is CustomCordapp || it.packages.isNotEmpty() || it.classes.isNotEmpty() || it.fixups.isNotEmpty() }.associateBy { it.jarFile }
val cordappsDir = baseDirectory / "cordapps"
val configDir = (cordappsDir / "config").createDirectories()
jarToCordapp.forEach { jar, cordapp ->
try {
jar.copyToDirectory(cordappsDir)
if (jar.toFile().exists()) {
jar.copyToDirectory(cordappsDir)
}
} catch (e: FileAlreadyExistsException) {
// Ignore if the node already has the same CorDapp jar. This can happen if the node is being restarted.
}

View File

@ -117,7 +117,7 @@ class NetworkMapServer(private val pollInterval: Duration,
// Mapping from the UUID of the network (null for global one) to hashes of the nodes in network
private val networkMaps = mutableMapOf<UUID?, MutableSet<SecureHash>>()
val latestAcceptedParametersMap = mutableMapOf<PublicKey, SecureHash>()
private val signedNetParams by lazy { networkMapCertAndKeyPair.sign(networkParameters) }
private val signedNetParams get() = networkMapCertAndKeyPair.sign(networkParameters)
@POST
@Path("publish")

View File

@ -22,7 +22,7 @@ import org.junit.Test
*/
class TestResponseFlowInIsolation {
private val network: MockNetwork = MockNetwork(MockNetworkParameters(cordappsForAllNodes = cordappsForPackages("com.template")))
private val network: MockNetwork = MockNetwork(MockNetworkParameters(cordappsForAllNodes = cordappsForPackages()))
private val a = network.createNode()
private val b = network.createNode()

View File

@ -28,7 +28,7 @@ import static org.hamcrest.Matchers.instanceOf;
*/
public class TestResponseFlowInIsolationInJava {
private final MockNetwork network = new MockNetwork(new MockNetworkParameters().withCordappsForAllNodes(cordappsForPackages("com.template")));
private final MockNetwork network = new MockNetwork(new MockNetworkParameters().withCordappsForAllNodes(cordappsForPackages()));
private final StartedMockNode a = network.createNode();
private final StartedMockNode b = network.createNode();

View File

@ -1,96 +0,0 @@
package net.corda.tools.shell
import co.paralleluniverse.fibers.Suspendable
import com.google.common.io.Files
import com.jcraft.jsch.ChannelExec
import com.jcraft.jsch.JSch
import com.jcraft.jsch.Session
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.Permissions
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import org.bouncycastle.util.io.Streams
import org.junit.Test
import kotlin.test.assertTrue
class HashLookupCommandTest {
@Test(timeout=300_000)
fun `hash lookup command returns correct response`() {
val user = User("u", "p", setOf(Permissions.all()))
driver(DriverParameters(notarySpecs = emptyList(), cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP))) {
val nodeFuture = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true)
val node = nodeFuture.getOrThrow()
val txId = issueTransaction(node)
val txIdHashed = txId.sha256()
testCommand(user, node, command = "hashLookup $txId", expected = "Found a matching transaction with Id: $txId")
testCommand(user, node, command = "hashLookup $txIdHashed", expected = "Found a matching transaction with Id: $txId")
testCommand(user, node, command = "hashLookup ${SecureHash.randomSHA256()}", expected = "No matching transaction found")
}
}
private fun testCommand(user: User, node: NodeHandle, command: String, expected: String) {
val session = connectToShell(user, node)
val channel = session.openChannel("exec") as ChannelExec
channel.setCommand(command)
channel.connect(5000)
assertTrue(channel.isConnected)
val response = String(Streams.readAll(channel.inputStream))
val matchFound = response.lines().any { line ->
line.contains(expected)
}
channel.disconnect()
assertTrue(matchFound)
session.disconnect()
}
private fun connectToShell(user: User, node: NodeHandle): Session {
val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(),
user = user.username, password = user.password,
hostAndPort = node.rpcAddress,
sshdPort = 2224)
InteractiveShell.startShell(conf)
InteractiveShell.nodeInfo()
val session = JSch().getSession("u", "localhost", 2224)
session.setConfig("StrictHostKeyChecking", "no")
session.setPassword("p")
session.connect()
assertTrue(session.isConnected)
return session
}
private fun issueTransaction(node: NodeHandle): SecureHash {
return node.rpc.startFlow(::DummyIssue).returnValue.get()
}
}
@StartableByRPC
internal class DummyIssue : FlowLogic<SecureHash>() {
@Suspendable
override fun call(): SecureHash {
val me = serviceHub.myInfo.legalIdentities.first().ref(0)
val fakeNotary = me.party
val builder = DummyContract.generateInitial(1, fakeNotary as Party, me)
val stx = serviceHub.signInitialTransaction(builder)
serviceHub.recordTransactions(stx)
return stx.id
}
}

View File

@ -1,6 +1,7 @@
package net.corda.tools.shell;
import net.corda.core.crypto.SecureHash;
import net.corda.core.internal.VisibleForTesting;
import net.corda.core.messaging.CordaRPCOps;
import net.corda.core.messaging.StateMachineTransactionMapping;
import org.crsh.cli.Argument;
@ -13,13 +14,14 @@ import org.crsh.text.Decoration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
@Named("hashLookup")
public class HashLookupShellCommand extends InteractiveShellCommand {
private static Logger logger = LoggerFactory.getLogger(HashLookupShellCommand.class);
final private String manualText ="Checks if a transaction matching a specified Id hash value is recorded on this node.\n\n" +
private static final String manualText ="Checks if a transaction matching a specified Id hash value is recorded on this node.\n\n" +
"Both the transaction Id and the hashed value of a transaction Id (as returned by the Notary in case of a double-spend) is a valid input.\n" +
"This is mainly intended to be used for troubleshooting notarisation issues when a\n" +
"state is claimed to be already consumed by another transaction.\n\n" +
@ -29,25 +31,32 @@ public class HashLookupShellCommand extends InteractiveShellCommand {
@Man(manualText)
public void main(@Usage("A transaction Id or a hexadecimal SHA-256 hash value representing the hashed transaction Id") @Argument(unquote = false) String txIdHash) {
CordaRPCOps proxy = ops();
try {
hashLookup(out, proxy, txIdHash);
} catch (IllegalArgumentException ex) {
out.println(manualText);
out.println(ex.getMessage(), Decoration.bold, Color.red);
}
}
@VisibleForTesting
protected static void hashLookup(PrintWriter out, CordaRPCOps proxy, String txIdHash) throws IllegalArgumentException {
logger.info("Executing command \"hashLookup\".");
if (txIdHash == null) {
out.println(manualText);
out.println("Please provide a hexadecimal transaction Id hash value or a transaction Id", Decoration.bold, Color.red);
return;
throw new IllegalArgumentException("Please provide a hexadecimal transaction Id hash value or a transaction Id");
}
CordaRPCOps proxy = ops();
List<StateMachineTransactionMapping> mapping = proxy.stateMachineRecordedTransactionMappingSnapshot();
SecureHash txIdHashParsed;
try {
txIdHashParsed = SecureHash.parse(txIdHash);
} catch (IllegalArgumentException e) {
out.println("The provided string is not a valid hexadecimal SHA-256 hash value", Decoration.bold, Color.red);
return;
throw new IllegalArgumentException("The provided string is not a valid hexadecimal SHA-256 hash value");
}
List<StateMachineTransactionMapping> mapping = proxy.stateMachineRecordedTransactionMappingSnapshot();
Optional<SecureHash> match = mapping.stream()
.map(StateMachineTransactionMapping::getTransactionId)
.filter(
@ -59,7 +68,7 @@ public class HashLookupShellCommand extends InteractiveShellCommand {
SecureHash found = match.get();
out.println("Found a matching transaction with Id: " + found.toString());
} else {
out.println("No matching transaction found", Decoration.bold, Color.red);
throw new IllegalArgumentException("No matching transaction found");
}
}
}

View File

@ -0,0 +1,67 @@
package net.corda.tools.shell
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineTransactionMapping
import org.hamcrest.MatcherAssert
import org.hamcrest.core.StringContains
import org.junit.Test
import org.mockito.Mockito
import java.io.CharArrayWriter
import java.io.PrintWriter
import java.util.UUID
import kotlin.test.assertFailsWith
class HashLookupCommandTest {
companion object {
private val DEFAULT_TXID: SecureHash = SecureHash.randomSHA256()
private fun ops(vararg txIds: SecureHash): CordaRPCOps? {
val snapshot: List<StateMachineTransactionMapping> = txIds.map { txId ->
StateMachineTransactionMapping(StateMachineRunId(UUID.randomUUID()), txId)
}
return Mockito.mock(CordaRPCOps::class.java).apply {
Mockito.`when`(stateMachineRecordedTransactionMappingSnapshot()).thenReturn(snapshot)
}
}
private fun runCommand(ops: CordaRPCOps?, txIdHash: String): String {
val arrayWriter = CharArrayWriter()
return PrintWriter(arrayWriter).use {
HashLookupShellCommand.hashLookup(it, ops, txIdHash)
it.flush()
arrayWriter.toString()
}
}
}
@Test(timeout=300_000)
fun `hash lookup command returns correct response`() {
val ops = ops(DEFAULT_TXID)
var response = runCommand(ops, DEFAULT_TXID.toString())
MatcherAssert.assertThat(response, StringContains.containsString("Found a matching transaction with Id: $DEFAULT_TXID"))
// Verify the hash of the TX ID also works
response = runCommand(ops, DEFAULT_TXID.sha256().toString())
MatcherAssert.assertThat(response, StringContains.containsString("Found a matching transaction with Id: $DEFAULT_TXID"))
}
@Test(timeout=300_000)
fun `should reject invalid txid`() {
val ops = ops(DEFAULT_TXID)
assertFailsWith<IllegalArgumentException>("The provided string is not a valid hexadecimal SHA-256 hash value") {
runCommand(ops, "abcdefgh")
}
}
@Test(timeout=300_000)
fun `should reject unknown txid`() {
val ops = ops(DEFAULT_TXID)
assertFailsWith<IllegalArgumentException>("No matching transaction found") {
runCommand(ops, SecureHash.randomSHA256().toString())
}
}
}