From 0619b4ce901ca57e44b02f70ff649f70426adde5 Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Fri, 24 Jan 2020 12:02:16 +0000 Subject: [PATCH 01/17] add --- .../migration/node-core.changelog-v17.xml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 node/src/main/resources/migration/node-core.changelog-v17.xml diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml new file mode 100644 index 0000000000..4293beef3f --- /dev/null +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -0,0 +1,19 @@ +<?xml version="1.1" encoding="UTF-8" standalone="no"?> +<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" + logicalFilePath="migration/node-services.changelog-init.xml"> + + <changeSet author="R3.Corda" id="drop_existing_checkpoints_table"> + <dropTable tableName="node_checkpoints"/> + </changeSet> + + <changeSet author="R3.Corda" id="1511451595465-7"> + <createTable tableName="node_checkpoints"> + <column name="checkpoint_id" type="NVARCHAR(64)"> + <constraints nullable="false"/> + </column> + <column name="checkpoint_value" type="blob"/> + </createTable> + </changeSet> +</databaseChangeLog> \ No newline at end of file From 5750c39348eacfa37ce7a8584e61c615aecb174b Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Fri, 24 Jan 2020 13:24:08 +0000 Subject: [PATCH 02/17] start work on liquibase schema for new checkpoints --- Jenkinsfile | 2 +- .../migration/node-core.changelog-master.xml | 4 + .../node-core.changelog-v17-keys.xml | 34 +++++ .../node-core.changelog-v17-postgres.xml | 139 ++++++++++++++++++ .../migration/node-core.changelog-v17.xml | 128 +++++++++++++++- 5 files changed, 302 insertions(+), 5 deletions(-) create mode 100644 node/src/main/resources/migration/node-core.changelog-v17-keys.xml create mode 100644 node/src/main/resources/migration/node-core.changelog-v17-postgres.xml diff --git a/Jenkinsfile b/Jenkinsfile index dcd0522239..4d41a7eb2f 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -5,7 +5,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger()) pipeline { - agent { label 'k8s' } + agent { label 'aks' } options { timestamps() } environment { diff --git a/node/src/main/resources/migration/node-core.changelog-master.xml b/node/src/main/resources/migration/node-core.changelog-master.xml index 28842e0825..8c5a7916e1 100644 --- a/node/src/main/resources/migration/node-core.changelog-master.xml +++ b/node/src/main/resources/migration/node-core.changelog-master.xml @@ -31,4 +31,8 @@ <include file="migration/node-core.changelog-v14-data.xml"/> + <include file="migration/node-core.changelog-v17.xml"/> + <include file="migration/node-core.changelog-v17-postgres.xml"/> + <include file="migration/node-core.changelog-v17-keys.xml"/> + </databaseChangeLog> diff --git a/node/src/main/resources/migration/node-core.changelog-v17-keys.xml b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml new file mode 100644 index 0000000000..3253f2ed22 --- /dev/null +++ b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml @@ -0,0 +1,34 @@ +<?xml version="1.1" encoding="UTF-8" standalone="no"?> +<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" + logicalFilePath="migration/node-services.changelog-init.xml"> + + <changeSet author="R3.Corda" id="add_new_checkpoint_schema_primary_keys"> + <addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints"/> + <addPrimaryKey columnNames="id" constraintName="node_checkpoint_blobs_pk" tableName="node_checkpoint_blobs"/> + <addPrimaryKey columnNames="id" constraintName="node_checkpoint_exceptions_pk" tableName="node_flow_exceptions"/> + <addPrimaryKey columnNames="id" constraintName="node_checkpoint_results_pk" tableName="node_flow_results"/> + <addPrimaryKey columnNames="invocation_id" constraintName="node_flow_metadata_pk" tableName="node_flow_metadata"/> + </changeSet> + + <changeSet author="R3.Corda" id="add_new_checkpoint_schema_foreign_keys"> + <addForeignKeyConstraint baseColumnNames="checkpoint_blob_id" baseTableName="node_checkpoints" + constraintName="node_checkpoint_blob_id_to_blob_table_fk" + referencedColumnNames="id" referencedTableName="node_checkpoint_blobs"/> + + <addForeignKeyConstraint baseColumnNames="error_id" baseTableName="node_checkpoints" + constraintName="node_checkpoints_to_exceptions_fk" + referencedColumnNames="id" referencedTableName="node_flow_exceptions"/> + + <addForeignKeyConstraint baseColumnNames="result_id" baseTableName="node_checkpoints" + constraintName="node_checkpoint_to_result_fk" + referencedColumnNames="id" referencedTableName="node_flow_results"/> + + <addForeignKeyConstraint baseColumnNames="flow_id" baseTableName="node_flow_metadata" + constraintName="node_metadata_to_checkpoints_fk" + referencedColumnNames="flow_id" referencedTableName="node_checkpoints"/> + + </changeSet> + +</databaseChangeLog> \ No newline at end of file diff --git a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml new file mode 100644 index 0000000000..d850c57f83 --- /dev/null +++ b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml @@ -0,0 +1,139 @@ +<?xml version="1.1" encoding="UTF-8" standalone="no"?> +<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" + logicalFilePath="migration/node-services.changelog-init.xml"> + + <changeSet author="R3.Corda" id="drop_existing_checkpoints_table" dbms="postgresql"> + <dropTable tableName="node_checkpoints"/> + </changeSet> + + <changeSet author="R3.Corda" id="add_new_checkpoints_table" dbms="postgresql"> + <createTable tableName="node_checkpoints"> + <column name="flow_id" type="NVARCHAR(64)"> + <constraints nullable="false"/> + </column> + <column name="checkpoint_blob_id" type="BIGINT"> + <constraints nullable="true"/> + </column> + <column name="result_id" type="BIGINT"> + <constraints nullable="true"/> + </column> + <column name="error_id" type="BIGINT"> + <constraints nullable="true"/> + </column> + <column name="status" type="NVARCHAR(16)"> + <constraints nullable="false"/> + </column> + <column name="compatible" type="BOOLEAN"> + <constraints nullable="false"/> + </column> + <column name="progress_step" type="NVARCHAR(32)"> + <constraints nullable="true"/> + </column> + <column name="flow_io_request" type="NVARCHAR(32)"> + <constraints nullable="true"/> + </column> + <column name="timestamp" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + </createTable> + </changeSet> + + <changeSet author="R3.Corda" id="add_new_checkpoint_blobs_table" dbms="postgresql"> + <createTable tableName="node_checkpoint_blobs"> + <column name="id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="checkpoint_value" type="varbinary(33554432)"> + <constraints nullable="false"/> + </column> + <column name="flow_state" type="varbinary(33554432)"> + <constraints nullable="false"/> + </column> + <column name="timestamp" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + <column name="hmac" type="NVARCHAR(32)"> + <constraints nullable="false"/> + </column> + </createTable> + </changeSet> + + + <changeSet author="R3.Corda" id="add_new_flow_result_table" dbms="postgresql"> + <createTable tableName="node_flow_results"> + <column name="id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="result_value" type="varbinary(33554432)"> + <constraints nullable="false"/> + </column> + <column name="timestamp" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + </createTable> + </changeSet> + + <changeSet author="R3.Corda" id="add_new_flow_exceptions_table" dbms="postgresql"> + <createTable tableName="node_flow_exceptions"> + <column name="id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="type" type="NVARCHAR(128)"> + <constraints nullable="false"/> + </column> + <column name="exception_value" type="varbinary(33554432)"> + <constraints nullable="false"/> + </column> + <column name="timestamp" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + </createTable> + </changeSet> + + <changeSet author="R3.Corda" id="add_new_flow_metadata_table" dbms="postgresql"> + <createTable tableName="node_flow_metadata"> + <column name="invocation_id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="flow_id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="flow_name" type="NVARCHAR(128)"> + <constraints nullable="false"/> + </column> + <column name="flow_identifier" type="NVARCHAR(128)"> + <constraints nullable="true"/> + </column> + <column name="started_type" type="NVARCHAR(32)"> + <constraints nullable="false"/> + </column> + <column name="flow_parameters" type="varbinary(33554432)"> + <constraints nullable="false"/> + </column> + <column name="cordapp_name" type="NVARCHAR(64)"> + <constraints nullable="false"/> + </column> + <column name="platform_version" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="rpc_user" type="NVARCHAR(64)"> + <constraints nullable="false"/> + </column> + <column name="invocation_time" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + <column name="received_time" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + <column name="start_time" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="true"/> + </column> + <column name="finish_time" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="true"/> + </column> + </createTable> + </changeSet> + +</databaseChangeLog> \ No newline at end of file diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml index 4293beef3f..77dd65848b 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -4,16 +4,136 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" logicalFilePath="migration/node-services.changelog-init.xml"> - <changeSet author="R3.Corda" id="drop_existing_checkpoints_table"> + <changeSet author="R3.Corda" id="drop_existing_checkpoints_table" dbms="!postgresql"> <dropTable tableName="node_checkpoints"/> </changeSet> - <changeSet author="R3.Corda" id="1511451595465-7"> + <changeSet author="R3.Corda" id="add_new_checkpoints_table" dbms="!postgresql"> <createTable tableName="node_checkpoints"> - <column name="checkpoint_id" type="NVARCHAR(64)"> + <column name="flow_id" type="NVARCHAR(64)"> + <constraints nullable="false"/> + </column> + <column name="checkpoint_blob_id" type="BIGINT"> + <constraints nullable="true"/> + </column> + <column name="result_id" type="BIGINT"> + <constraints nullable="true"/> + </column> + <column name="error_id" type="BIGINT"> + <constraints nullable="true"/> + </column> + <column name="status" type="NVARCHAR(16)"> + <constraints nullable="false"/> + </column> + <column name="compatible" type="BOOLEAN"> + <constraints nullable="false"/> + </column> + <column name="progress_step" type="NVARCHAR(32)"> + <constraints nullable="true"/> + </column> + <column name="flow_io_request" type="NVARCHAR(32)"> + <constraints nullable="true"/> + </column> + <column name="timestamp" type="java.sql.Types.TIMESTAMP"> <constraints nullable="false"/> </column> - <column name="checkpoint_value" type="blob"/> </createTable> </changeSet> + + <changeSet author="R3.Corda" id="add_new_checkpoint_blob_table" dbms="!postgresql"> + <createTable tableName="node_checkpoint_blobs"> + <column name="id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="checkpoint_value" type="blob"> + <constraints nullable="false"/> + </column> + <column name="flow_state" type="blob"> + <constraints nullable="false"/> + </column> + <column name="timestamp" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + <column name="hmac" type="NVARCHAR(32)"> + <constraints nullable="false"/> + </column> + </createTable> + </changeSet> + + + <changeSet author="R3.Corda" id="add_new_flow_result_table" dbms="!postgresql"> + <createTable tableName="node_flow_results"> + <column name="id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="result_value" type="blob"> + <constraints nullable="false"/> + </column> + <column name="timestamp" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + </createTable> + </changeSet> + + <changeSet author="R3.Corda" id="add_new_flow_exceptions_table" dbms="!postgresql"> + <createTable tableName="node_flow_exceptions"> + <column name="id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="type" type="NVARCHAR(128)"> + <constraints nullable="false"/> + </column> + <column name="exception_value" type="blob"> + <constraints nullable="false"/> + </column> + <column name="timestamp" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + </createTable> + </changeSet> + + <changeSet author="R3.Corda" id="add_new_flow_metadata_table" dbms="!postgresql"> + <createTable tableName="node_flow_metadata"> + <column name="invocation_id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="flow_id" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="flow_name" type="NVARCHAR(128)"> + <constraints nullable="false"/> + </column> + <column name="flow_identifier" type="NVARCHAR(128)"> + <constraints nullable="true"/> + </column> + <column name="started_type" type="NVARCHAR(32)"> + <constraints nullable="false"/> + </column> + <column name="flow_parameters" type="blob"> + <constraints nullable="false"/> + </column> + <column name="cordapp_name" type="NVARCHAR(64)"> + <constraints nullable="false"/> + </column> + <column name="platform_version" type="BIGINT"> + <constraints nullable="false"/> + </column> + <column name="rpc_user" type="NVARCHAR(64)"> + <constraints nullable="false"/> + </column> + <column name="invocation_time" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + <column name="received_time" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="false"/> + </column> + <column name="start_time" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="true"/> + </column> + <column name="finish_time" type="java.sql.Types.TIMESTAMP"> + <constraints nullable="true"/> + </column> + </createTable> + </changeSet> + </databaseChangeLog> \ No newline at end of file From 3b0c1b98a1ee551aba9888ff1baa36fd8431d15c Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Sun, 26 Jan 2020 18:44:03 +0000 Subject: [PATCH 03/17] switch to new version of plugin --- .ci/dev/localStorageClass.yml | 5 +++++ build.gradle | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 .ci/dev/localStorageClass.yml diff --git a/.ci/dev/localStorageClass.yml b/.ci/dev/localStorageClass.yml new file mode 100644 index 0000000000..f380c51dbe --- /dev/null +++ b/.ci/dev/localStorageClass.yml @@ -0,0 +1,5 @@ +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: testing-storage +provisioner: microk8s.io/hostpath \ No newline at end of file diff --git a/build.gradle b/build.gradle index bdb66ed4ec..e5dcc0f818 100644 --- a/build.gradle +++ b/build.gradle @@ -183,7 +183,7 @@ buildscript { // Capsule gradle plugin forked and maintained locally to support Gradle 5.x // See https://github.com/corda/gradle-capsule-plugin classpath "us.kirchmeier:gradle-capsule-plugin:1.0.4_r3" - classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-SNAPSHOT", changing: true + classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-SNAPSHOT", changing: true classpath "com.bmuschko:gradle-docker-plugin:5.0.0" } } From d9f638260ca805881e689f27301bd5e8257dbbe0 Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Sun, 26 Jan 2020 19:01:49 +0000 Subject: [PATCH 04/17] change plugin version to force refresh --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index e5dcc0f818..724ea70799 100644 --- a/build.gradle +++ b/build.gradle @@ -183,7 +183,7 @@ buildscript { // Capsule gradle plugin forked and maintained locally to support Gradle 5.x // See https://github.com/corda/gradle-capsule-plugin classpath "us.kirchmeier:gradle-capsule-plugin:1.0.4_r3" - classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-SNAPSHOT", changing: true + classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-K8s-SNAPSHOT", changing: true classpath "com.bmuschko:gradle-docker-plugin:5.0.0" } } From 6435a132e49269679cb63248d5b08dd9cca89ae7 Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Sun, 26 Jan 2020 20:27:19 +0000 Subject: [PATCH 05/17] try using pre-populated cache --- Jenkinsfile | 8 ++++---- build.gradle | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index dcd0522239..6fd3f917a6 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -5,7 +5,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger()) pipeline { - agent { label 'k8s' } + agent { label 'local-k8s' } options { timestamps() } environment { @@ -24,7 +24,7 @@ pipeline { "-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " + "-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " + "-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " clean pushBuildImage preAllocateForAllParallelIntegrationTest preAllocateForAllParallelUnitTest --stacktrace" + " clean pushBuildImage preAllocateForAllParallelIntegrationTest --stacktrace" } sh "kubectl auth can-i get pods" } @@ -42,7 +42,7 @@ pipeline { "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + "-Dgit.branch=\"\${GIT_BRANCH}\" " + "-Dgit.target.branch=\"\${CHANGE_TARGET}\" " + - " deAllocateForAllParallelIntegrationTest allParallelIntegrationTest --stacktrace" + " allParallelIntegrationTest --stacktrace" } } stage('Unit Tests') { @@ -55,7 +55,7 @@ pipeline { "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + "-Dgit.branch=\"\${GIT_BRANCH}\" " + "-Dgit.target.branch=\"\${CHANGE_TARGET}\" " + - " deAllocateForAllParallelUnitTest allParallelUnitTest --stacktrace" + " allParallelUnitTest --stacktrace" } } } diff --git a/build.gradle b/build.gradle index 724ea70799..804f3897b7 100644 --- a/build.gradle +++ b/build.gradle @@ -183,7 +183,7 @@ buildscript { // Capsule gradle plugin forked and maintained locally to support Gradle 5.x // See https://github.com/corda/gradle-capsule-plugin classpath "us.kirchmeier:gradle-capsule-plugin:1.0.4_r3" - classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-K8s-SNAPSHOT", changing: true + classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-K8S-SHARED-CACHE-SNAPSHOT", changing: true classpath "com.bmuschko:gradle-docker-plugin:5.0.0" } } From 8c6904e9ef2efc24c852f4076a90ec8bcfb6af3f Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Sun, 26 Jan 2020 21:11:59 +0000 Subject: [PATCH 06/17] do not preallocate --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 6fd3f917a6..0b1039cb5d 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -24,7 +24,7 @@ pipeline { "-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " + "-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " + "-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " clean pushBuildImage preAllocateForAllParallelIntegrationTest --stacktrace" + " clean pushBuildImage --stacktrace" } sh "kubectl auth can-i get pods" } From 4465c15d396d3d1e700e1e3fa38fb61888668cef Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Sun, 26 Jan 2020 21:16:55 +0000 Subject: [PATCH 07/17] no daemon --- Jenkinsfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 0b1039cb5d..4c679de085 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -19,7 +19,7 @@ pipeline { stage('Corda Pull Request - Generate Build Image') { steps { withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) { - sh "./gradlew " + + sh "./gradlew --no-daemon " + "-Dkubenetize=true " + "-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " + "-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " + @@ -34,7 +34,7 @@ pipeline { parallel { stage('Integration Tests') { steps { - sh "./gradlew " + + sh "./gradlew --no-daemon " + "-DbuildId=\"\${BUILD_ID}\" " + "-Dkubenetize=true " + "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " + @@ -47,7 +47,7 @@ pipeline { } stage('Unit Tests') { steps { - sh "./gradlew " + + sh "./gradlew --no-daemon " + "-DbuildId=\"\${BUILD_ID}\" " + "-Dkubenetize=true " + "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " + From 7af363d1a6ace73507c5b57e8c649a160deb188a Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Mon, 27 Jan 2020 09:14:27 +0000 Subject: [PATCH 08/17] reduce cores per fork to increase parallelism --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 804f3897b7..f12fb23445 100644 --- a/build.gradle +++ b/build.gradle @@ -634,7 +634,7 @@ task allParallelIntegrationTest(type: ParallelTestGroup) { testGroups "integrationTest" numberOfShards 10 streamOutput false - coresPerFork 5 + coresPerFork 4 memoryInGbPerFork 12 distribute DistributeTestsBy.METHOD nodeTaints "big" From 47c14673c404bb707994c12e77093aee9cb6b8cc Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Mon, 27 Jan 2020 11:20:13 +0000 Subject: [PATCH 09/17] address initial review comments --- .../resources/migration/node-core.changelog-v17-postgres.xml | 4 ++-- node/src/main/resources/migration/node-core.changelog-v17.xml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml index d850c57f83..13ea6dab2d 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml @@ -28,7 +28,7 @@ <column name="compatible" type="BOOLEAN"> <constraints nullable="false"/> </column> - <column name="progress_step" type="NVARCHAR(32)"> + <column name="progress_step" type="NVARCHAR(256)"> <constraints nullable="true"/> </column> <column name="flow_io_request" type="NVARCHAR(32)"> @@ -98,7 +98,7 @@ <constraints nullable="false"/> </column> <column name="flow_id" type="BIGINT"> - <constraints nullable="false"/> + <constraints nullable="true"/> </column> <column name="flow_name" type="NVARCHAR(128)"> <constraints nullable="false"/> diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml index 77dd65848b..8cfad5f8b2 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -28,7 +28,7 @@ <column name="compatible" type="BOOLEAN"> <constraints nullable="false"/> </column> - <column name="progress_step" type="NVARCHAR(32)"> + <column name="progress_step" type="NVARCHAR(256)"> <constraints nullable="true"/> </column> <column name="flow_io_request" type="NVARCHAR(32)"> @@ -98,7 +98,7 @@ <constraints nullable="false"/> </column> <column name="flow_id" type="BIGINT"> - <constraints nullable="false"/> + <constraints nullable="true"/> </column> <column name="flow_name" type="NVARCHAR(128)"> <constraints nullable="false"/> From a3bc624e16e4af527f177164119e1154b9bd70a3 Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Mon, 27 Jan 2020 12:19:14 +0000 Subject: [PATCH 10/17] add message to exceptions table --- .../resources/migration/node-core.changelog-v17-postgres.xml | 3 +++ node/src/main/resources/migration/node-core.changelog-v17.xml | 3 +++ 2 files changed, 6 insertions(+) diff --git a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml index 13ea6dab2d..44c9a73583 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml @@ -86,6 +86,9 @@ <column name="exception_value" type="varbinary(33554432)"> <constraints nullable="false"/> </column> + <column name="exception_message" type="NVARCHAR(512)"> + <constraints nullable="false"/> + </column> <column name="timestamp" type="java.sql.Types.TIMESTAMP"> <constraints nullable="false"/> </column> diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml index 8cfad5f8b2..66c5ff7bfb 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -86,6 +86,9 @@ <column name="exception_value" type="blob"> <constraints nullable="false"/> </column> + <column name="exception_message" type="NVARCHAR(512)"> + <constraints nullable="false"/> + </column> <column name="timestamp" type="java.sql.Types.TIMESTAMP"> <constraints nullable="false"/> </column> From 93623d73175e30fbf54d34282e09b606bd6255b8 Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Mon, 27 Jan 2020 12:56:36 +0000 Subject: [PATCH 11/17] even more aggresive cpu allocation --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index f12fb23445..1228d771bf 100644 --- a/build.gradle +++ b/build.gradle @@ -634,7 +634,7 @@ task allParallelIntegrationTest(type: ParallelTestGroup) { testGroups "integrationTest" numberOfShards 10 streamOutput false - coresPerFork 4 + coresPerFork 2 memoryInGbPerFork 12 distribute DistributeTestsBy.METHOD nodeTaints "big" @@ -644,7 +644,7 @@ task allParallelUnitTest(type: ParallelTestGroup) { testGroups "test" numberOfShards 10 streamOutput false - coresPerFork 3 + coresPerFork 2 memoryInGbPerFork 12 distribute DistributeTestsBy.CLASS nodeTaints "small" From 78d83e9583d307346914277b7f7afc4be173ca90 Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Tue, 28 Jan 2020 16:55:14 +0000 Subject: [PATCH 12/17] start adding hibernate entities --- .../persistence/DBCheckpointStorage.kt | 30 +++++++++++++++++++ .../migration/node-core.changelog-master.xml | 6 ++-- .../node-core.changelog-v17-postgres.xml | 4 +-- .../migration/node-core.changelog-v17.xml | 4 +-- 4 files changed, 37 insertions(+), 7 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index b1eec763f6..1ffa860b0d 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -25,6 +25,36 @@ import java.sql.SQLException class DBCheckpointStorage : CheckpointStorage { val log: Logger = LoggerFactory.getLogger(this::class.java) + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") + class DBFlowCheckpoint( + + ) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs") + class DBFlowCheckpointBlob( + + ) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_results") + class DBFlowResult( + + ) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_exceptions") + class DBFlowException( + + ) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_metadata") + class DBFlowMetadata( + + ) + @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") class DBCheckpoint( diff --git a/node/src/main/resources/migration/node-core.changelog-master.xml b/node/src/main/resources/migration/node-core.changelog-master.xml index 8c5a7916e1..b320960f02 100644 --- a/node/src/main/resources/migration/node-core.changelog-master.xml +++ b/node/src/main/resources/migration/node-core.changelog-master.xml @@ -31,8 +31,8 @@ <include file="migration/node-core.changelog-v14-data.xml"/> - <include file="migration/node-core.changelog-v17.xml"/> - <include file="migration/node-core.changelog-v17-postgres.xml"/> - <include file="migration/node-core.changelog-v17-keys.xml"/> +<!-- <include file="migration/node-core.changelog-v17.xml"/>--> +<!-- <include file="migration/node-core.changelog-v17-postgres.xml"/>--> +<!-- <include file="migration/node-core.changelog-v17-keys.xml"/>--> </databaseChangeLog> diff --git a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml index 44c9a73583..5c4960b7a1 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml @@ -100,13 +100,13 @@ <column name="invocation_id" type="BIGINT"> <constraints nullable="false"/> </column> - <column name="flow_id" type="BIGINT"> + <column name="flow_id" type="NVARCHAR(64)"> <constraints nullable="true"/> </column> <column name="flow_name" type="NVARCHAR(128)"> <constraints nullable="false"/> </column> - <column name="flow_identifier" type="NVARCHAR(128)"> + <column name="flow_identifier" type="NVARCHAR(512)"> <constraints nullable="true"/> </column> <column name="started_type" type="NVARCHAR(32)"> diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml index 66c5ff7bfb..5143e69574 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -100,13 +100,13 @@ <column name="invocation_id" type="BIGINT"> <constraints nullable="false"/> </column> - <column name="flow_id" type="BIGINT"> + <column name="flow_id" type="NVARCHAR(64)"> <constraints nullable="true"/> </column> <column name="flow_name" type="NVARCHAR(128)"> <constraints nullable="false"/> </column> - <column name="flow_identifier" type="NVARCHAR(128)"> + <column name="flow_identifier" type="NVARCHAR(512)"> <constraints nullable="true"/> </column> <column name="started_type" type="NVARCHAR(32)"> From 7b3da954561021c8fc7c80ef0ae9517ca258ad5c Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Wed, 29 Jan 2020 17:10:34 +0000 Subject: [PATCH 13/17] flesh out entities for new checkpointing --- .../net/corda/core/internal/FlowIORequest.kt | 11 ++- .../persistence/DBCheckpointStorage.kt | 69 +++++++++++++++++-- .../node/services/statemachine/FlowMonitor.kt | 2 +- .../statemachine/FlowStateMachineImpl.kt | 2 +- .../migration/node-core.changelog-v17.xml | 5 +- 5 files changed, 80 insertions(+), 9 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt index 494c5099aa..0d54a4715a 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt @@ -45,6 +45,7 @@ sealed class FlowIORequest<out R : Any> { * @property shouldRetrySend specifies whether the send should be retried. * @return a map from session to received message. */ + //net.corda.core.internal.FlowIORequest.SendAndReceive data class SendAndReceive( val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>, val shouldRetrySend: Boolean @@ -80,7 +81,15 @@ sealed class FlowIORequest<out R : Any> { /** * Suspend the flow until all Initiating sessions are confirmed. */ - object WaitForSessionConfirmations : FlowIORequest<Unit>() + class WaitForSessionConfirmations : FlowIORequest<Unit>() { + override fun equals(other: Any?): Boolean { + return this === other + } + + override fun hashCode(): Int { + return System.identityHashCode(this) + } + } /** * Execute the specified [operation], suspend the flow until completion. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 1ffa860b0d..c739a4876e 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -1,7 +1,9 @@ package net.corda.node.services.persistence import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes +import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.debug import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint @@ -16,8 +18,13 @@ import javax.persistence.Column import javax.persistence.Entity import javax.persistence.Id import org.hibernate.annotations.Type +import java.math.BigInteger import java.sql.Connection import java.sql.SQLException +import java.time.Instant +import javax.persistence.FetchType +import javax.persistence.JoinColumn +import javax.persistence.OneToOne /** * Simple checkpoint key value storage in DB. @@ -25,16 +32,71 @@ import java.sql.SQLException class DBCheckpointStorage : CheckpointStorage { val log: Logger = LoggerFactory.getLogger(this::class.java) + enum class FlowStatus { + RUNNABLE, + FAILED, + COMPLETED, + HOSPITALIZED, + KILLED, + PAUSED + } + @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") class DBFlowCheckpoint( + @Id + @Column(name = "flow_id", length = 64, nullable = false) + private var id: String? = null, + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "id") + private var blob: DBFlowCheckpointBlob? = null, + + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "id") + private var result: DBFlowResult? = null, + + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "id") + private var exceptionDetails: DBFlowException? = null, + + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "flow_id") + private var flowMetadata: DBFlowMetadata? = null, + + @Column(name = "status") + private var status: FlowStatus? = null, + + @Column(name = "compatible") + private var compatible: Boolean? = null, + + @Column(name = "progress_step") + private var progressStep: String? = null, + + @Column(name = "flow_io_request") + private val ioRequestType: Class<FlowIORequest<*>>? = null, + + @Column(name = "timestamp") + private val checkpointInstant: Instant? = null ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs") class DBFlowCheckpointBlob( + @Id + @Column(name = "id", nullable = false) + private var id: BigInteger? = null, + @Type(type = "corda-blob") + @Column(name = "checkpoint_value", nullable = false) + var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, + + @Type(type = "corda-blob") + @Column(name = "flow_state", nullable = false) + var flowStack: ByteArray = EMPTY_BYTE_ARRAY, + + @Column(name = "timestamp") + private val instant: Instant? = null ) @Entity @@ -65,10 +127,10 @@ class DBCheckpointStorage : CheckpointStorage { @Type(type = "corda-blob") @Column(name = "checkpoint_value", nullable = false) - var checkpoint: ByteArray = EMPTY_BYTE_ARRAY + var checkpoint: ByteArray = EMPTY_BYTE_ARRAY ) { - override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})" - } + override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})" + } override fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>) { currentDBSession().save(DBCheckpoint().apply { @@ -86,7 +148,6 @@ class DBCheckpointStorage : CheckpointStorage { }) } - override fun removeCheckpoint(id: StateMachineRunId): Boolean { val session = currentDBSession() val criteriaBuilder = session.criteriaBuilder diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt index b947f62f2b..9f80005880 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt @@ -81,7 +81,7 @@ internal class FlowMonitor( is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}" is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}" is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" - FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" + is FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete" FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow" } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 4a9a407473..2bbdae6ba3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -269,7 +269,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId, Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader val result = logic.call() - suspend(FlowIORequest.WaitForSessionConfirmations, maySkipCheckpoint = true) + suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true) Try.Success(result) } catch (t: Throwable) { if(t.isUnrecoverable()) { diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml index 5143e69574..70dfa0f3f3 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -22,7 +22,7 @@ <column name="error_id" type="BIGINT"> <constraints nullable="true"/> </column> - <column name="status" type="NVARCHAR(16)"> + <column name="status" type="BIGINT"> <constraints nullable="false"/> </column> <column name="compatible" type="BOOLEAN"> @@ -31,7 +31,8 @@ <column name="progress_step" type="NVARCHAR(256)"> <constraints nullable="true"/> </column> - <column name="flow_io_request" type="NVARCHAR(32)"> + <!-- net.corda.core.internal.FlowIORequest.SendAndReceive --> + <column name="flow_io_request" type="NVARCHAR(128)"> <constraints nullable="true"/> </column> <column name="timestamp" type="java.sql.Types.TIMESTAMP"> From 2b079bd92ea61241cbbe05e59742465d7919646c Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Thu, 30 Jan 2020 12:17:26 +0000 Subject: [PATCH 14/17] working node startup with new tables and entities --- .../persistence/DBCheckpointStorage.kt | 101 +++++++++++++++--- .../node/services/schema/NodeSchemaService.kt | 8 ++ .../migration/node-core.changelog-master.xml | 6 +- .../node-core.changelog-v17-keys.xml | 10 +- .../node-core.changelog-v17-postgres.xml | 27 +++-- .../migration/node-core.changelog-v17.xml | 14 +-- 6 files changed, 118 insertions(+), 48 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index c739a4876e..e2449568e7 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -18,6 +18,7 @@ import javax.persistence.Column import javax.persistence.Entity import javax.persistence.Id import org.hibernate.annotations.Type +import java.lang.Exception import java.math.BigInteger import java.sql.Connection import java.sql.SQLException @@ -41,43 +42,47 @@ class DBCheckpointStorage : CheckpointStorage { PAUSED } + enum class StartReason { + RPC, FLOW, SERVICE, SCHEDULED, INITIATED + } + @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_new") class DBFlowCheckpoint( @Id @Column(name = "flow_id", length = 64, nullable = false) - private var id: String? = null, + var id: String? = null, @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "id") - private var blob: DBFlowCheckpointBlob? = null, + @JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id") + var blob: DBFlowCheckpointBlob? = null, @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "id") - private var result: DBFlowResult? = null, + @JoinColumn(name = "result_id", referencedColumnName = "id") + var result: DBFlowResult? = null, @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "id") - private var exceptionDetails: DBFlowException? = null, + @JoinColumn(name = "error_id", referencedColumnName = "id") + var exceptionDetails: DBFlowException? = null, @OneToOne(fetch = FetchType.LAZY) @JoinColumn(name = "flow_id") - private var flowMetadata: DBFlowMetadata? = null, + var flowMetadata: DBFlowMetadata? = null, @Column(name = "status") - private var status: FlowStatus? = null, + var status: FlowStatus? = null, @Column(name = "compatible") - private var compatible: Boolean? = null, + var compatible: Boolean? = null, @Column(name = "progress_step") - private var progressStep: String? = null, + var progressStep: String? = null, @Column(name = "flow_io_request") - private val ioRequestType: Class<FlowIORequest<*>>? = null, + var ioRequestType: Class<FlowIORequest<*>>? = null, @Column(name = "timestamp") - private val checkpointInstant: Instant? = null + var checkpointInstant: Instant? = null ) @Entity @@ -85,7 +90,7 @@ class DBCheckpointStorage : CheckpointStorage { class DBFlowCheckpointBlob( @Id @Column(name = "id", nullable = false) - private var id: BigInteger? = null, + var id: BigInteger? = null, @Type(type = "corda-blob") @Column(name = "checkpoint_value", nullable = false) @@ -96,25 +101,89 @@ class DBCheckpointStorage : CheckpointStorage { var flowStack: ByteArray = EMPTY_BYTE_ARRAY, @Column(name = "timestamp") - private val instant: Instant? = null + var persistedInstant: Instant? = null ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_results") class DBFlowResult( + @Id + @Column(name = "id", nullable = false) + var id: BigInteger? = null, + @Type(type = "corda-blob") + @Column(name = "result_value", nullable = false) + var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, + + @Column(name = "timestamp") + val persistedInstant: Instant? = null ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_exceptions") class DBFlowException( + @Id + @Column(name = "id", nullable = false) + var id: BigInteger? = null, + @Column(name = "type", nullable = false) + var type: Class<Exception>? = null, + + @Type(type = "corda-blob") + @Column(name = "exception_value", nullable = false) + var value: ByteArray, + + @Column(name = "exception_message") + var message: String? = null, + + @Column(name = "timestamp") + val persistedInstant: Instant? = null ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_metadata") class DBFlowMetadata( + @Id + @Column(name = "id", nullable = false) + var id: BigInteger? = null, + + @Column(name = "flow_id", length = 64, nullable = false) + var flowId: String? = null, + + @Column(name = "flow_name", nullable = false) + var flowName: String? = null, + + @Column(name = "flow_identifier", nullable = true) + var userSuppliedIdentifier: String? = null, + + @Column(name = "started_type", nullable = true) + var startType: StartReason? = null, + + @Column(name = "flow_parameters", nullable = true) + var initialParameters: ByteArray? = null, + + @Column(name = "cordapp_name", nullable = true) + var launchingCordapp: String? = null, + + @Column(name = "platform_version", nullable = true) + var platformVersion: Int? = null, + + @Column(name = "rpc_user", nullable = true) + var rpcUsername: String? = null, + + @Column(name = "invocation_time", nullable = true) + var invocationInstant: Instant? = null, + + @Column(name = "received_time", nullable = true) + var receivedInstant: Instant? = null, + + @Column(name = "start_time", nullable = true) + var startInstant: Instant? = null, + + @Column(name = "finish_time", nullable = true) + var finishInstant: Instant? = null + ) @Entity diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index a834dbb3ab..73d7cbb3d9 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -34,6 +34,14 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet() object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1, mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java, + + //new checkpoints - keeping old around to allow testing easily (for now) + DBCheckpointStorage.DBFlowCheckpoint::class.java, + DBCheckpointStorage.DBFlowCheckpointBlob::class.java, + DBCheckpointStorage.DBFlowResult::class.java, + DBCheckpointStorage.DBFlowException::class.java, + DBCheckpointStorage.DBFlowMetadata::class.java, + DBTransactionStorage.DBTransaction::class.java, BasicHSMKeyManagementService.PersistentKey::class.java, NodeSchedulerService.PersistentScheduledState::class.java, diff --git a/node/src/main/resources/migration/node-core.changelog-master.xml b/node/src/main/resources/migration/node-core.changelog-master.xml index b320960f02..8c5a7916e1 100644 --- a/node/src/main/resources/migration/node-core.changelog-master.xml +++ b/node/src/main/resources/migration/node-core.changelog-master.xml @@ -31,8 +31,8 @@ <include file="migration/node-core.changelog-v14-data.xml"/> -<!-- <include file="migration/node-core.changelog-v17.xml"/>--> -<!-- <include file="migration/node-core.changelog-v17-postgres.xml"/>--> -<!-- <include file="migration/node-core.changelog-v17-keys.xml"/>--> + <include file="migration/node-core.changelog-v17.xml"/> + <include file="migration/node-core.changelog-v17-postgres.xml"/> + <include file="migration/node-core.changelog-v17-keys.xml"/> </databaseChangeLog> diff --git a/node/src/main/resources/migration/node-core.changelog-v17-keys.xml b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml index 3253f2ed22..fdc812a02c 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-keys.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml @@ -5,7 +5,7 @@ logicalFilePath="migration/node-services.changelog-init.xml"> <changeSet author="R3.Corda" id="add_new_checkpoint_schema_primary_keys"> - <addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints"/> + <addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints_new"/> <addPrimaryKey columnNames="id" constraintName="node_checkpoint_blobs_pk" tableName="node_checkpoint_blobs"/> <addPrimaryKey columnNames="id" constraintName="node_checkpoint_exceptions_pk" tableName="node_flow_exceptions"/> <addPrimaryKey columnNames="id" constraintName="node_checkpoint_results_pk" tableName="node_flow_results"/> @@ -13,21 +13,21 @@ </changeSet> <changeSet author="R3.Corda" id="add_new_checkpoint_schema_foreign_keys"> - <addForeignKeyConstraint baseColumnNames="checkpoint_blob_id" baseTableName="node_checkpoints" + <addForeignKeyConstraint baseColumnNames="checkpoint_blob_id" baseTableName="node_checkpoints_new" constraintName="node_checkpoint_blob_id_to_blob_table_fk" referencedColumnNames="id" referencedTableName="node_checkpoint_blobs"/> - <addForeignKeyConstraint baseColumnNames="error_id" baseTableName="node_checkpoints" + <addForeignKeyConstraint baseColumnNames="error_id" baseTableName="node_checkpoints_new" constraintName="node_checkpoints_to_exceptions_fk" referencedColumnNames="id" referencedTableName="node_flow_exceptions"/> - <addForeignKeyConstraint baseColumnNames="result_id" baseTableName="node_checkpoints" + <addForeignKeyConstraint baseColumnNames="result_id" baseTableName="node_checkpoints_new" constraintName="node_checkpoint_to_result_fk" referencedColumnNames="id" referencedTableName="node_flow_results"/> <addForeignKeyConstraint baseColumnNames="flow_id" baseTableName="node_flow_metadata" constraintName="node_metadata_to_checkpoints_fk" - referencedColumnNames="flow_id" referencedTableName="node_checkpoints"/> + referencedColumnNames="flow_id" referencedTableName="node_checkpoints_new"/> </changeSet> diff --git a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml index 5c4960b7a1..9602ec51b3 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml @@ -4,12 +4,8 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" logicalFilePath="migration/node-services.changelog-init.xml"> - <changeSet author="R3.Corda" id="drop_existing_checkpoints_table" dbms="postgresql"> - <dropTable tableName="node_checkpoints"/> - </changeSet> - - <changeSet author="R3.Corda" id="add_new_checkpoints_table" dbms="postgresql"> - <createTable tableName="node_checkpoints"> + <changeSet author="R3.Corda" id="add_new_checkpoints_table-pg" dbms="postgresql"> + <createTable tableName="node_checkpoints_new"> <column name="flow_id" type="NVARCHAR(64)"> <constraints nullable="false"/> </column> @@ -22,7 +18,7 @@ <column name="error_id" type="BIGINT"> <constraints nullable="true"/> </column> - <column name="status" type="NVARCHAR(16)"> + <column name="status" type="TINYINT"> <constraints nullable="false"/> </column> <column name="compatible" type="BOOLEAN"> @@ -31,7 +27,8 @@ <column name="progress_step" type="NVARCHAR(256)"> <constraints nullable="true"/> </column> - <column name="flow_io_request" type="NVARCHAR(32)"> + <!-- net.corda.core.internal.FlowIORequest.SendAndReceive --> + <column name="flow_io_request" type="NVARCHAR(128)"> <constraints nullable="true"/> </column> <column name="timestamp" type="java.sql.Types.TIMESTAMP"> @@ -40,7 +37,7 @@ </createTable> </changeSet> - <changeSet author="R3.Corda" id="add_new_checkpoint_blobs_table" dbms="postgresql"> + <changeSet author="R3.Corda" id="add_new_checkpoint_blob_table-pg" dbms="postgresql"> <createTable tableName="node_checkpoint_blobs"> <column name="id" type="BIGINT"> <constraints nullable="false"/> @@ -61,7 +58,7 @@ </changeSet> - <changeSet author="R3.Corda" id="add_new_flow_result_table" dbms="postgresql"> + <changeSet author="R3.Corda" id="add_new_flow_result_table-pg" dbms="postgresql"> <createTable tableName="node_flow_results"> <column name="id" type="BIGINT"> <constraints nullable="false"/> @@ -75,12 +72,12 @@ </createTable> </changeSet> - <changeSet author="R3.Corda" id="add_new_flow_exceptions_table" dbms="postgresql"> + <changeSet author="R3.Corda" id="add_new_flow_exceptions_table-pg" dbms="postgresql"> <createTable tableName="node_flow_exceptions"> <column name="id" type="BIGINT"> <constraints nullable="false"/> </column> - <column name="type" type="NVARCHAR(128)"> + <column name="type" type="NVARCHAR(256)"> <constraints nullable="false"/> </column> <column name="exception_value" type="varbinary(33554432)"> @@ -95,7 +92,7 @@ </createTable> </changeSet> - <changeSet author="R3.Corda" id="add_new_flow_metadata_table" dbms="postgresql"> + <changeSet author="R3.Corda" id="add_new_flow_metadata_table-pg" dbms="postgresql"> <createTable tableName="node_flow_metadata"> <column name="invocation_id" type="BIGINT"> <constraints nullable="false"/> @@ -109,7 +106,7 @@ <column name="flow_identifier" type="NVARCHAR(512)"> <constraints nullable="true"/> </column> - <column name="started_type" type="NVARCHAR(32)"> + <column name="started_type" type="TINYINT"> <constraints nullable="false"/> </column> <column name="flow_parameters" type="varbinary(33554432)"> @@ -118,7 +115,7 @@ <column name="cordapp_name" type="NVARCHAR(64)"> <constraints nullable="false"/> </column> - <column name="platform_version" type="BIGINT"> + <column name="platform_version" type="TINYINT"> <constraints nullable="false"/> </column> <column name="rpc_user" type="NVARCHAR(64)"> diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml index 70dfa0f3f3..2c1789fc95 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -4,12 +4,8 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" logicalFilePath="migration/node-services.changelog-init.xml"> - <changeSet author="R3.Corda" id="drop_existing_checkpoints_table" dbms="!postgresql"> - <dropTable tableName="node_checkpoints"/> - </changeSet> - <changeSet author="R3.Corda" id="add_new_checkpoints_table" dbms="!postgresql"> - <createTable tableName="node_checkpoints"> + <createTable tableName="node_checkpoints_new"> <column name="flow_id" type="NVARCHAR(64)"> <constraints nullable="false"/> </column> @@ -22,7 +18,7 @@ <column name="error_id" type="BIGINT"> <constraints nullable="true"/> </column> - <column name="status" type="BIGINT"> + <column name="status" type="TINYINT"> <constraints nullable="false"/> </column> <column name="compatible" type="BOOLEAN"> @@ -81,7 +77,7 @@ <column name="id" type="BIGINT"> <constraints nullable="false"/> </column> - <column name="type" type="NVARCHAR(128)"> + <column name="type" type="NVARCHAR(256)"> <constraints nullable="false"/> </column> <column name="exception_value" type="blob"> @@ -110,7 +106,7 @@ <column name="flow_identifier" type="NVARCHAR(512)"> <constraints nullable="true"/> </column> - <column name="started_type" type="NVARCHAR(32)"> + <column name="started_type" type="TINYINT"> <constraints nullable="false"/> </column> <column name="flow_parameters" type="blob"> @@ -119,7 +115,7 @@ <column name="cordapp_name" type="NVARCHAR(64)"> <constraints nullable="false"/> </column> - <column name="platform_version" type="BIGINT"> + <column name="platform_version" type="TINYINT"> <constraints nullable="false"/> </column> <column name="rpc_user" type="NVARCHAR(64)"> From eec9ada6bab92360e2e27ef4480137417fb29f7d Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Thu, 30 Jan 2020 18:18:45 +0000 Subject: [PATCH 15/17] persist some nonsense data to see if it breaks the statemachine --- .../persistence/DBCheckpointStorage.kt | 5 +-- .../SingleThreadedStateMachineManager.kt | 42 +++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index e2449568e7..cf058dd930 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -66,7 +66,7 @@ class DBCheckpointStorage : CheckpointStorage { var exceptionDetails: DBFlowException? = null, @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "flow_id") + @JoinColumn(name = "flow_id", referencedColumnName = "flow_id") var flowMetadata: DBFlowMetadata? = null, @Column(name = "status") @@ -145,9 +145,6 @@ class DBCheckpointStorage : CheckpointStorage { class DBFlowMetadata( @Id - @Column(name = "id", nullable = false) - var id: BigInteger? = null, - @Column(name = "flow_id", length = 64, nullable = false) var flowId: String? = null, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 34c9b77582..3383ef9ba7 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -34,6 +34,7 @@ import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler +import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine @@ -42,6 +43,7 @@ import net.corda.node.utilities.errorAndTerminate import net.corda.node.utilities.injectOldProgressTracker import net.corda.node.utilities.isEnabledTimedFlow import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl import net.corda.serialization.internal.withTokenContext @@ -586,6 +588,46 @@ class SingleThreadedStateMachineManager( val flowAlreadyExists = mutex.locked { flows[flowId] != null } val existingCheckpoint = if (flowAlreadyExists) { + + val currentDBSession = currentDBSession() + val dbFlowCheckpoint = currentDBSession.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, flowId.toString()) + ?: DBCheckpointStorage.DBFlowCheckpoint( + flowId.toString(), + null, + null, + null, + null, + DBCheckpointStorage.FlowStatus.RUNNABLE, + true, + flowLogic.progressTracker?.currentStep?.toString(), + null, + null + ) + + var cordappName : String? = null + var initiatingParty: String? = null + var startReason: DBCheckpointStorage.StartReason = DBCheckpointStorage.StartReason.FLOW + when (flowStart) { + is FlowStart.Initiated -> { + cordappName = flowStart.initiatedFlowInfo.appName + initiatingParty = flowStart.peerSession.counterparty.name.toString() + startReason = DBCheckpointStorage.StartReason.INITIATED + } + } + + currentDBSession.get(DBCheckpointStorage.DBFlowMetadata::class.java, flowId.toString()) ?: DBCheckpointStorage.DBFlowMetadata( + flowId = flowId.toString(), + flowName = "thisIsAPlaceholder", + userSuppliedIdentifier = "thisIsAnotherPlaceholder", + startType = startReason, + initialParameters = null, + launchingCordapp = cordappName + ) + + + currentDBSession.persist(dbFlowCheckpoint) + + // Load the flow's checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) From a28c15c2fd5fc4175f8e0e355180586040e9a78d Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Mon, 10 Feb 2020 11:52:54 +0000 Subject: [PATCH 16/17] address dan review comments --- .../persistence/DBCheckpointStorage.kt | 46 ++++----- .../SingleThreadedStateMachineManager.kt | 98 ++++++------------- 2 files changed, 51 insertions(+), 93 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index cf058dd930..c2ba70135c 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -51,38 +51,38 @@ class DBCheckpointStorage : CheckpointStorage { class DBFlowCheckpoint( @Id @Column(name = "flow_id", length = 64, nullable = false) - var id: String? = null, + var id: String, @OneToOne(fetch = FetchType.LAZY) @JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id") - var blob: DBFlowCheckpointBlob? = null, + var blob: DBFlowCheckpointBlob, @OneToOne(fetch = FetchType.LAZY) @JoinColumn(name = "result_id", referencedColumnName = "id") - var result: DBFlowResult? = null, + var result: DBFlowResult, @OneToOne(fetch = FetchType.LAZY) @JoinColumn(name = "error_id", referencedColumnName = "id") - var exceptionDetails: DBFlowException? = null, + var exceptionDetails: DBFlowException, @OneToOne(fetch = FetchType.LAZY) @JoinColumn(name = "flow_id", referencedColumnName = "flow_id") - var flowMetadata: DBFlowMetadata? = null, + var flowMetadata: DBFlowMetadata, @Column(name = "status") - var status: FlowStatus? = null, + var status: FlowStatus, @Column(name = "compatible") - var compatible: Boolean? = null, + var compatible: Boolean, @Column(name = "progress_step") - var progressStep: String? = null, + var progressStep: String, @Column(name = "flow_io_request") - var ioRequestType: Class<FlowIORequest<*>>? = null, + var ioRequestType: Class<FlowIORequest<*>>, @Column(name = "timestamp") - var checkpointInstant: Instant? = null + var checkpointInstant: Instant ) @Entity @@ -131,7 +131,7 @@ class DBCheckpointStorage : CheckpointStorage { @Type(type = "corda-blob") @Column(name = "exception_value", nullable = false) - var value: ByteArray, + var value: ByteArray = EMPTY_BYTE_ARRAY, @Column(name = "exception_message") var message: String? = null, @@ -146,40 +146,40 @@ class DBCheckpointStorage : CheckpointStorage { @Id @Column(name = "flow_id", length = 64, nullable = false) - var flowId: String? = null, + var flowId: String, @Column(name = "flow_name", nullable = false) - var flowName: String? = null, + var flowName: String, @Column(name = "flow_identifier", nullable = true) - var userSuppliedIdentifier: String? = null, + var userSuppliedIdentifier: String, @Column(name = "started_type", nullable = true) - var startType: StartReason? = null, + var startType: StartReason, @Column(name = "flow_parameters", nullable = true) - var initialParameters: ByteArray? = null, + var initialParameters: ByteArray = EMPTY_BYTE_ARRAY, @Column(name = "cordapp_name", nullable = true) - var launchingCordapp: String? = null, + var launchingCordapp: String, @Column(name = "platform_version", nullable = true) - var platformVersion: Int? = null, + var platformVersion: Int, @Column(name = "rpc_user", nullable = true) - var rpcUsername: String? = null, + var rpcUsername: String, @Column(name = "invocation_time", nullable = true) - var invocationInstant: Instant? = null, + var invocationInstant: Instant, @Column(name = "received_time", nullable = true) - var receivedInstant: Instant? = null, + var receivedInstant: Instant, @Column(name = "start_time", nullable = true) - var startInstant: Instant? = null, + var startInstant: Instant, @Column(name = "finish_time", nullable = true) - var finishInstant: Instant? = null + var finishInstant: Instant ) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 3383ef9ba7..371d5ed92a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -34,7 +34,6 @@ import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine @@ -43,7 +42,6 @@ import net.corda.node.utilities.errorAndTerminate import net.corda.node.utilities.injectOldProgressTracker import net.corda.node.utilities.isEnabledTimedFlow import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl import net.corda.serialization.internal.withTokenContext @@ -372,11 +370,11 @@ class SingleThreadedStateMachineManager( } // Resurrect flow createFlowFromCheckpoint( - id = flowId, - serializedCheckpoint = serializedCheckpoint, - initialDeduplicationHandler = null, - isAnyCheckpointPersisted = true, - isStartIdempotent = false + id = flowId, + serializedCheckpoint = serializedCheckpoint, + initialDeduplicationHandler = null, + isAnyCheckpointPersisted = true, + isStartIdempotent = false ) ?: return } else { // Just flow initiation message @@ -411,9 +409,9 @@ class SingleThreadedStateMachineManager( // Failed to retry - manually put the flow in for observation rather than // relying on the [HospitalisingInterceptor] to do so val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored) - ?.errors - ?.map { it.exception } - ?.plus(e) ?: emptyList() + ?.errors + ?.map { it.exception } + ?.plus(e) ?: emptyList() logger.info("Failed to retry flow $flowId, keeping in for observation and aborting") flowHospital.forceIntoOvernightObservation(flowId, exceptions) throw e @@ -433,11 +431,11 @@ class SingleThreadedStateMachineManager( private fun <T> onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent<T>) { val future = startFlow( - event.flowId, - event.flowLogic, - event.context, - ourIdentity = null, - deduplicationHandler = event.deduplicationHandler + event.flowId, + event.flowLogic, + event.context, + ourIdentity = null, + deduplicationHandler = event.deduplicationHandler ) event.wireUpFuture(future) } @@ -505,14 +503,14 @@ class SingleThreadedStateMachineManager( is InitiatedFlowFactory.CorDapp -> null } startInitiatedFlow( - event.flowId, - flowLogic, - event.deduplicationHandler, - senderSession, - initiatedSessionId, - sessionMessage, - senderCoreFlowVersion, - initiatedFlowInfo + event.flowId, + flowLogic, + event.deduplicationHandler, + senderSession, + initiatedSessionId, + sessionMessage, + senderCoreFlowVersion, + initiatedFlowInfo ) } catch (t: Throwable) { logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " + @@ -588,46 +586,6 @@ class SingleThreadedStateMachineManager( val flowAlreadyExists = mutex.locked { flows[flowId] != null } val existingCheckpoint = if (flowAlreadyExists) { - - val currentDBSession = currentDBSession() - val dbFlowCheckpoint = currentDBSession.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, flowId.toString()) - ?: DBCheckpointStorage.DBFlowCheckpoint( - flowId.toString(), - null, - null, - null, - null, - DBCheckpointStorage.FlowStatus.RUNNABLE, - true, - flowLogic.progressTracker?.currentStep?.toString(), - null, - null - ) - - var cordappName : String? = null - var initiatingParty: String? = null - var startReason: DBCheckpointStorage.StartReason = DBCheckpointStorage.StartReason.FLOW - when (flowStart) { - is FlowStart.Initiated -> { - cordappName = flowStart.initiatedFlowInfo.appName - initiatingParty = flowStart.peerSession.counterparty.name.toString() - startReason = DBCheckpointStorage.StartReason.INITIATED - } - } - - currentDBSession.get(DBCheckpointStorage.DBFlowMetadata::class.java, flowId.toString()) ?: DBCheckpointStorage.DBFlowMetadata( - flowId = flowId.toString(), - flowName = "thisIsAPlaceholder", - userSuppliedIdentifier = "thisIsAnotherPlaceholder", - startType = startReason, - initialParameters = null, - launchingCordapp = cordappName - ) - - - currentDBSession.persist(dbFlowCheckpoint) - - // Load the flow's checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) @@ -647,13 +605,13 @@ class SingleThreadedStateMachineManager( null } val checkpoint = existingCheckpoint ?: Checkpoint.create( - invocationContext, - flowStart, - flowLogic.javaClass, - frozenFlowLogic, - ourIdentity, - flowCorDappVersion, - flowLogic.isEnabledTimedFlow() + invocationContext, + flowStart, + flowLogic.javaClass, + frozenFlowLogic, + ourIdentity, + flowCorDappVersion, + flowLogic.isEnabledTimedFlow() ).getOrThrow() val startedFuture = openFuture<Unit>() From 9e8ce6473d2c12bd16b54fbefdb97d44ac8fcb67 Mon Sep 17 00:00:00 2001 From: stefano <stefano.franz@r3.com> Date: Mon, 10 Feb 2020 13:22:51 +0000 Subject: [PATCH 17/17] address dan review comments pt2 --- .../persistence/DBCheckpointStorage.kt | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index c2ba70135c..c9d883a5b0 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -59,11 +59,11 @@ class DBCheckpointStorage : CheckpointStorage { @OneToOne(fetch = FetchType.LAZY) @JoinColumn(name = "result_id", referencedColumnName = "id") - var result: DBFlowResult, + var result: DBFlowResult?, @OneToOne(fetch = FetchType.LAZY) @JoinColumn(name = "error_id", referencedColumnName = "id") - var exceptionDetails: DBFlowException, + var exceptionDetails: DBFlowException?, @OneToOne(fetch = FetchType.LAZY) @JoinColumn(name = "flow_id", referencedColumnName = "flow_id") @@ -127,7 +127,7 @@ class DBCheckpointStorage : CheckpointStorage { var id: BigInteger? = null, @Column(name = "type", nullable = false) - var type: Class<Exception>? = null, + var type: Class<Exception>, @Type(type = "corda-blob") @Column(name = "exception_value", nullable = false) @@ -152,34 +152,34 @@ class DBCheckpointStorage : CheckpointStorage { var flowName: String, @Column(name = "flow_identifier", nullable = true) - var userSuppliedIdentifier: String, + var userSuppliedIdentifier: String?, - @Column(name = "started_type", nullable = true) + @Column(name = "started_type", nullable = false) var startType: StartReason, - @Column(name = "flow_parameters", nullable = true) + @Column(name = "flow_parameters", nullable = false) var initialParameters: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "cordapp_name", nullable = true) + @Column(name = "cordapp_name", nullable = false) var launchingCordapp: String, - @Column(name = "platform_version", nullable = true) + @Column(name = "platform_version", nullable = false) var platformVersion: Int, - @Column(name = "rpc_user", nullable = true) + @Column(name = "rpc_user", nullable = false) var rpcUsername: String, - @Column(name = "invocation_time", nullable = true) + @Column(name = "invocation_time", nullable = false) var invocationInstant: Instant, - @Column(name = "received_time", nullable = true) + @Column(name = "received_time", nullable = false) var receivedInstant: Instant, @Column(name = "start_time", nullable = true) - var startInstant: Instant, + var startInstant: Instant?, @Column(name = "finish_time", nullable = true) - var finishInstant: Instant + var finishInstant: Instant? )