Merge remote-tracking branch 'origin/feature/checkpoint_table_improvements' into feature/checkpoint_table_improvements

This commit is contained in:
stefano 2020-02-13 12:08:07 +00:00
commit 13c52e4901
10 changed files with 525 additions and 35 deletions

View File

@ -45,6 +45,7 @@ sealed class FlowIORequest<out R : Any> {
* @property shouldRetrySend specifies whether the send should be retried.
* @return a map from session to received message.
*/
//net.corda.core.internal.FlowIORequest.SendAndReceive
data class SendAndReceive(
val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>,
val shouldRetrySend: Boolean
@ -80,7 +81,15 @@ sealed class FlowIORequest<out R : Any> {
/**
* Suspend the flow until all Initiating sessions are confirmed.
*/
object WaitForSessionConfirmations : FlowIORequest<Unit>()
class WaitForSessionConfirmations : FlowIORequest<Unit>() {
override fun equals(other: Any?): Boolean {
return this === other
}
override fun hashCode(): Int {
return System.identityHashCode(this)
}
}
/**
* Execute the specified [operation], suspend the flow until completion.

View File

@ -1,7 +1,9 @@
package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.debug
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.Checkpoint
@ -16,8 +18,14 @@ import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import org.hibernate.annotations.Type
import java.lang.Exception
import java.math.BigInteger
import java.sql.Connection
import java.sql.SQLException
import java.time.Instant
import javax.persistence.FetchType
import javax.persistence.JoinColumn
import javax.persistence.OneToOne
/**
* Simple checkpoint key value storage in DB.
@ -25,6 +33,156 @@ import java.sql.SQLException
class DBCheckpointStorage : CheckpointStorage {
val log: Logger = LoggerFactory.getLogger(this::class.java)
enum class FlowStatus {
RUNNABLE,
FAILED,
COMPLETED,
HOSPITALIZED,
KILLED,
PAUSED
}
enum class StartReason {
RPC, FLOW, SERVICE, SCHEDULED, INITIATED
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_new")
class DBFlowCheckpoint(
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var id: String,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id")
var blob: DBFlowCheckpointBlob,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "result_id", referencedColumnName = "id")
var result: DBFlowResult?,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "error_id", referencedColumnName = "id")
var exceptionDetails: DBFlowException?,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "flow_id", referencedColumnName = "flow_id")
var flowMetadata: DBFlowMetadata,
@Column(name = "status")
var status: FlowStatus,
@Column(name = "compatible")
var compatible: Boolean,
@Column(name = "progress_step")
var progressStep: String,
@Column(name = "flow_io_request")
var ioRequestType: Class<FlowIORequest<*>>,
@Column(name = "timestamp")
var checkpointInstant: Instant
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs")
class DBFlowCheckpointBlob(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Type(type = "corda-blob")
@Column(name = "checkpoint_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Type(type = "corda-blob")
@Column(name = "flow_state", nullable = false)
var flowStack: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "timestamp")
var persistedInstant: Instant? = null
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_results")
class DBFlowResult(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Type(type = "corda-blob")
@Column(name = "result_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "timestamp")
val persistedInstant: Instant? = null
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_exceptions")
class DBFlowException(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Column(name = "type", nullable = false)
var type: Class<Exception>,
@Type(type = "corda-blob")
@Column(name = "exception_value", nullable = false)
var value: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "exception_message")
var message: String? = null,
@Column(name = "timestamp")
val persistedInstant: Instant? = null
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_metadata")
class DBFlowMetadata(
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var flowId: String,
@Column(name = "flow_name", nullable = false)
var flowName: String,
@Column(name = "flow_identifier", nullable = true)
var userSuppliedIdentifier: String?,
@Column(name = "started_type", nullable = false)
var startType: StartReason,
@Column(name = "flow_parameters", nullable = false)
var initialParameters: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "cordapp_name", nullable = false)
var launchingCordapp: String,
@Column(name = "platform_version", nullable = false)
var platformVersion: Int,
@Column(name = "rpc_user", nullable = false)
var rpcUsername: String,
@Column(name = "invocation_time", nullable = false)
var invocationInstant: Instant,
@Column(name = "received_time", nullable = false)
var receivedInstant: Instant,
@Column(name = "start_time", nullable = true)
var startInstant: Instant?,
@Column(name = "finish_time", nullable = true)
var finishInstant: Instant?
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
class DBCheckpoint(
@ -35,10 +193,10 @@ class DBCheckpointStorage : CheckpointStorage {
@Type(type = "corda-blob")
@Column(name = "checkpoint_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY
) {
override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})"
}
override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})"
}
override fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>) {
currentDBSession().save(DBCheckpoint().apply {
@ -56,7 +214,6 @@ class DBCheckpointStorage : CheckpointStorage {
})
}
override fun removeCheckpoint(id: StateMachineRunId): Boolean {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder

View File

@ -34,6 +34,14 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1,
mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java,
//new checkpoints - keeping old around to allow testing easily (for now)
DBCheckpointStorage.DBFlowCheckpoint::class.java,
DBCheckpointStorage.DBFlowCheckpointBlob::class.java,
DBCheckpointStorage.DBFlowResult::class.java,
DBCheckpointStorage.DBFlowException::class.java,
DBCheckpointStorage.DBFlowMetadata::class.java,
DBTransactionStorage.DBTransaction::class.java,
BasicHSMKeyManagementService.PersistentKey::class.java,
NodeSchedulerService.PersistentScheduledState::class.java,

View File

@ -81,7 +81,7 @@ internal class FlowMonitor(
is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}"
is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}"
is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"
FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed"
is FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed"
is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete"
FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow"
}

View File

@ -269,7 +269,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader
val result = logic.call()
suspend(FlowIORequest.WaitForSessionConfirmations, maySkipCheckpoint = true)
suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true)
Try.Success(result)
} catch (t: Throwable) {
if(t.isUnrecoverable()) {

View File

@ -370,11 +370,11 @@ class SingleThreadedStateMachineManager(
}
// Resurrect flow
createFlowFromCheckpoint(
id = flowId,
serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
id = flowId,
serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
) ?: return
} else {
// Just flow initiation message
@ -409,9 +409,9 @@ class SingleThreadedStateMachineManager(
// Failed to retry - manually put the flow in for observation rather than
// relying on the [HospitalisingInterceptor] to do so
val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored)
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
flowHospital.forceIntoOvernightObservation(flowId, exceptions)
throw e
@ -431,11 +431,11 @@ class SingleThreadedStateMachineManager(
private fun <T> onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent<T>) {
val future = startFlow(
event.flowId,
event.flowLogic,
event.context,
ourIdentity = null,
deduplicationHandler = event.deduplicationHandler
event.flowId,
event.flowLogic,
event.context,
ourIdentity = null,
deduplicationHandler = event.deduplicationHandler
)
event.wireUpFuture(future)
}
@ -503,14 +503,14 @@ class SingleThreadedStateMachineManager(
is InitiatedFlowFactory.CorDapp -> null
}
startInitiatedFlow(
event.flowId,
flowLogic,
event.deduplicationHandler,
senderSession,
initiatedSessionId,
sessionMessage,
senderCoreFlowVersion,
initiatedFlowInfo
event.flowId,
flowLogic,
event.deduplicationHandler,
senderSession,
initiatedSessionId,
sessionMessage,
senderCoreFlowVersion,
initiatedFlowInfo
)
} catch (t: Throwable) {
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
@ -605,13 +605,13 @@ class SingleThreadedStateMachineManager(
null
}
val checkpoint = existingCheckpoint ?: Checkpoint.create(
invocationContext,
flowStart,
flowLogic.javaClass,
frozenFlowLogic,
ourIdentity,
flowCorDappVersion,
flowLogic.isEnabledTimedFlow()
invocationContext,
flowStart,
flowLogic.javaClass,
frozenFlowLogic,
ourIdentity,
flowCorDappVersion,
flowLogic.isEnabledTimedFlow()
).getOrThrow()
val startedFuture = openFuture<Unit>()

View File

@ -31,4 +31,8 @@
<include file="migration/node-core.changelog-v14-data.xml"/>
<include file="migration/node-core.changelog-v17.xml"/>
<include file="migration/node-core.changelog-v17-postgres.xml"/>
<include file="migration/node-core.changelog-v17-keys.xml"/>
</databaseChangeLog>

View File

@ -0,0 +1,34 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoint_schema_primary_keys">
<addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints_new"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_blobs_pk" tableName="node_checkpoint_blobs"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_exceptions_pk" tableName="node_flow_exceptions"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_results_pk" tableName="node_flow_results"/>
<addPrimaryKey columnNames="invocation_id" constraintName="node_flow_metadata_pk" tableName="node_flow_metadata"/>
</changeSet>
<changeSet author="R3.Corda" id="add_new_checkpoint_schema_foreign_keys">
<addForeignKeyConstraint baseColumnNames="checkpoint_blob_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoint_blob_id_to_blob_table_fk"
referencedColumnNames="id" referencedTableName="node_checkpoint_blobs"/>
<addForeignKeyConstraint baseColumnNames="error_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoints_to_exceptions_fk"
referencedColumnNames="id" referencedTableName="node_flow_exceptions"/>
<addForeignKeyConstraint baseColumnNames="result_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoint_to_result_fk"
referencedColumnNames="id" referencedTableName="node_flow_results"/>
<addForeignKeyConstraint baseColumnNames="flow_id" baseTableName="node_flow_metadata"
constraintName="node_metadata_to_checkpoints_fk"
referencedColumnNames="flow_id" referencedTableName="node_checkpoints_new"/>
</changeSet>
</databaseChangeLog>

View File

@ -0,0 +1,139 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoints_table-pg" dbms="postgresql">
<createTable tableName="node_checkpoints_new">
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="checkpoint_blob_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="result_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="error_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="status" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="compatible" type="BOOLEAN">
<constraints nullable="false"/>
</column>
<column name="progress_step" type="NVARCHAR(256)">
<constraints nullable="true"/>
</column>
<!-- net.corda.core.internal.FlowIORequest.SendAndReceive -->
<column name="flow_io_request" type="NVARCHAR(128)">
<constraints nullable="true"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_checkpoint_blob_table-pg" dbms="postgresql">
<createTable tableName="node_checkpoint_blobs">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="checkpoint_value" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="flow_state" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="hmac" type="NVARCHAR(32)">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_result_table-pg" dbms="postgresql">
<createTable tableName="node_flow_results">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="result_value" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_exceptions_table-pg" dbms="postgresql">
<createTable tableName="node_flow_exceptions">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="type" type="NVARCHAR(256)">
<constraints nullable="false"/>
</column>
<column name="exception_value" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="exception_message" type="NVARCHAR(512)">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_metadata_table-pg" dbms="postgresql">
<createTable tableName="node_flow_metadata">
<column name="invocation_id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="true"/>
</column>
<column name="flow_name" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="flow_identifier" type="NVARCHAR(512)">
<constraints nullable="true"/>
</column>
<column name="started_type" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="flow_parameters" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="cordapp_name" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="platform_version" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="rpc_user" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="invocation_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="received_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="start_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/>
</column>
<column name="finish_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/>
</column>
</createTable>
</changeSet>
</databaseChangeLog>

View File

@ -0,0 +1,139 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoints_table" dbms="!postgresql">
<createTable tableName="node_checkpoints_new">
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="checkpoint_blob_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="result_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="error_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="status" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="compatible" type="BOOLEAN">
<constraints nullable="false"/>
</column>
<column name="progress_step" type="NVARCHAR(256)">
<constraints nullable="true"/>
</column>
<!-- net.corda.core.internal.FlowIORequest.SendAndReceive -->
<column name="flow_io_request" type="NVARCHAR(128)">
<constraints nullable="true"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_checkpoint_blob_table" dbms="!postgresql">
<createTable tableName="node_checkpoint_blobs">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="checkpoint_value" type="blob">
<constraints nullable="false"/>
</column>
<column name="flow_state" type="blob">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="hmac" type="NVARCHAR(32)">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_result_table" dbms="!postgresql">
<createTable tableName="node_flow_results">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="result_value" type="blob">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_exceptions_table" dbms="!postgresql">
<createTable tableName="node_flow_exceptions">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="type" type="NVARCHAR(256)">
<constraints nullable="false"/>
</column>
<column name="exception_value" type="blob">
<constraints nullable="false"/>
</column>
<column name="exception_message" type="NVARCHAR(512)">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_metadata_table" dbms="!postgresql">
<createTable tableName="node_flow_metadata">
<column name="invocation_id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="true"/>
</column>
<column name="flow_name" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="flow_identifier" type="NVARCHAR(512)">
<constraints nullable="true"/>
</column>
<column name="started_type" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="flow_parameters" type="blob">
<constraints nullable="false"/>
</column>
<column name="cordapp_name" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="platform_version" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="rpc_user" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="invocation_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="received_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="start_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/>
</column>
<column name="finish_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/>
</column>
</createTable>
</changeSet>
</databaseChangeLog>