Grant Limberg be7ce4110e
Revert "Delete and re-add libpqxx-7.7.3 due to weird corruption."
This reverts commit e96515433d71684a5a9a876c7af93530e11e160b.
2022-06-24 10:12:36 -07:00

456 lines
17 KiB
C++

/* Definition of the pqxx::stream_to class.
*
* pqxx::stream_to enables optimized batch updates to a database table.
*
* DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/stream_to.hxx instead.
*
* Copyright (c) 2000-2022, Jeroen T. Vermeulen.
*
* See COPYING for copyright license. If you did not receive a file called
* COPYING with this source code, please notify the distributor of this
* mistake, or contact the author.
*/
#ifndef PQXX_H_STREAM_TO
#define PQXX_H_STREAM_TO
#if !defined(PQXX_HEADER_PRE)
# error "Include libpqxx headers as <pqxx/header>, not <pqxx/header.hxx>."
#endif
#include "pqxx/separated_list.hxx"
#include "pqxx/transaction_base.hxx"
namespace pqxx
{
/// Efficiently write data directly to a database table.
/** If you wish to insert rows of data into a table, you can compose INSERT
* statements and execute them. But it's slow and tedious, and you need to
* worry about quoting and escaping the data.
*
* If you're just inserting a single row, it probably won't matter much. You
* can use prepared or parameterised statements to take care of the escaping
* for you. But if you're inserting large numbers of rows you will want
* something better.
*
* Inserting rows one by one using INSERT statements involves a lot of
* pointless overhead, especially when you are working with a remote database
* server over the network. You may end up sending each row over the network
* as a separate query, and waiting for a reply. Do it "in bulk" using
* `stream_to`, and you may find that it goes many times faster. Sometimes
* you gain orders of magnitude in speed.
*
* Here's how it works: you create a `stream_to` stream to start writing to
* your table. You will probably want to specify the columns. Then, you
* feed your data into the stream one row at a time. And finally, you call the
* stream's @ref complete function to tell it to finalise the operation, wait
* for completion, and check for errors.
*
* (You _must_ complete the stream before committing or aborting the
* transaction. The connection is in a special state while the stream is
* active, where it can't process commands, and can't commit or abort a
* transaction.)
*
* So how do you feed a row of data into the stream? There's several ways, but
* the preferred one is to call its @ref write_values. Pass the field values
* as arguments. Doesn't matter what type they are, as long as libpqxx knows
* how to convert them to PostgreSQL's text format: `int`, `std::string` or
* `std:string_view`, `float` and `double`, `bool`... lots of basic types
* are supported. If some of the values are null, feel free to use
* `std::optional`, `std::shared_ptr`, or `std::unique_ptr`.
*
* The arguments' types don't even have to match the fields' SQL types. If you
* want to insert an `int` into a `DECIMAL` column, that's your choice -- it
* will produce a `DECIMAL` value which happens to be integral. Insert a
* `float` into a `VARCHAR` column? That's fine, you'll get a string whose
* contents happen to read like a number. And so on. You can even insert
* different types of value in the same column on different rows. If you have
* a code path where a particular field is always null, just insert `nullptr`.
*
* There is another way to insert rows: the `<<` ("shift-left") operator.
* It's not as fast and it doesn't support variable arguments: each row must be
* either a `std::tuple` or something iterable, such as a `std::vector`, or
* anything else with a `begin()` and `end()`.
*
* @warning While a stream is active, you cannot execute queries, open a
* pipeline, etc. on the same transaction. A transaction can have at most one
* object of a type derived from @ref pqxx::transaction_focus active on it at a
* time.
*/
class PQXX_LIBEXPORT stream_to : transaction_focus
{
public:
/// Stream data to a pre-quoted table and columns.
/** This factory can be useful when it's not convenient to provide the
* columns list in the form of a `std::initializer_list`, or when the list
* of columns is simply not known at compile time.
*
* Also use this if you need to create multiple streams using the same table
* path and/or columns list, and you want to save a bit of work on composing
* the internal SQL statement for starting the stream. It lets you compose
* the string representations for the table path and the columns list, so you
* can compute these once and then re-use them later.
*
* @param tx The transaction within which the stream will operate.
* @param path Name or path for the table upon which the stream will
* operate. If any part of the table path may contain special
* characters or be case-sensitive, quote the path using
* pqxx::connection::quote_table().
* @param columns Columns to which the stream will write. They should be
* comma-separated and, if needed, quoted. You can produce the string
* using pqxx::connection::quote_columns(). If you omit this argument,
* the stream will write all columns in the table, in schema order.
*/
static stream_to raw_table(
transaction_base &tx, std::string_view path, std::string_view columns = "")
{
return {tx, path, columns};
}
/// Create a `stream_to` writing to a named table and columns.
/** Use this to stream data to a table, where the list of columns is known at
* compile time.
*
* @param tx The transaction within which the stream will operate.
* @param path A @ref table_path designating the target table.
* @param columns Optionally, the columns to which the stream should write.
* If you do not pass this, the stream will write to all columns in the
* table, in schema order.
*/
static stream_to table(
transaction_base &tx, table_path path,
std::initializer_list<std::string_view> columns = {})
{
auto const &conn{tx.conn()};
return raw_table(tx, conn.quote_table(path), conn.quote_columns(columns));
}
#if defined(PQXX_HAVE_CONCEPTS)
/// Create a `stream_to` writing to a named table and columns.
/** Use this version to stream data to a table, when the list of columns is
* not known at compile time.
*
* @param tx The transaction within which the stream will operate.
* @param path A @ref table_path designating the target table.
* @param columns The columns to which the stream should write.
*/
template<PQXX_CHAR_STRINGS_ARG COLUMNS>
static stream_to
table(transaction_base &tx, table_path path, COLUMNS const &columns)
{
auto const &conn{tx.conn()};
return stream_to::raw_table(
tx, conn.quote_table(path), tx.conn().quote_columns(columns));
}
/// Create a `stream_to` writing to a named table and columns.
/** Use this version to stream data to a table, when the list of columns is
* not known at compile time.
*
* @param tx The transaction within which the stream will operate.
* @param path A @ref table_path designating the target table.
* @param columns The columns to which the stream should write.
*/
template<PQXX_CHAR_STRINGS_ARG COLUMNS>
static stream_to
table(transaction_base &tx, std::string_view path, COLUMNS const &columns)
{
return stream_to::raw_table(tx, path, tx.conn().quote_columns(columns));
}
#endif // PQXX_HAVE_CONCEPTS
/// Create a stream, without specifying columns.
/** @deprecated Use @ref table or @ref raw_table as a factory.
*
* Fields will be inserted in whatever order the columns have in the
* database.
*
* You'll probably want to specify the columns, so that the mapping between
* your data fields and the table is explicit in your code, and not hidden
* in an "implicit contract" between your code and your schema.
*/
[[deprecated("Use table() or raw_table() factory.")]] stream_to(
transaction_base &tx, std::string_view table_name) :
stream_to{tx, table_name, ""sv}
{}
/// Create a stream, specifying column names as a container of strings.
/** @deprecated Use @ref table or @ref raw_table as a factory.
*/
template<typename Columns>
[[deprecated("Use table() or raw_table() factory.")]] stream_to(
transaction_base &, std::string_view table_name, Columns const &columns);
/// Create a stream, specifying column names as a sequence of strings.
/** @deprecated Use @ref table or @ref raw_table as a factory.
*/
template<typename Iter>
[[deprecated("Use table() or raw_table() factory.")]] stream_to(
transaction_base &, std::string_view table_name, Iter columns_begin,
Iter columns_end);
~stream_to() noexcept;
/// Does this stream still need to @ref complete()?
[[nodiscard]] constexpr operator bool() const noexcept
{
return not m_finished;
}
/// Has this stream been through its concluding @c complete()?
[[nodiscard]] constexpr bool operator!() const noexcept
{
return m_finished;
}
/// Complete the operation, and check for errors.
/** Always call this to close the stream in an orderly fashion, even after
* an error. (In the case of an error, abort the transaction afterwards.)
*
* The only circumstance where it's safe to skip this is after an error, if
* you're discarding the entire connection.
*/
void complete();
/// Insert a row of data.
/** Returns a reference to the stream, so you can chain the calls.
*
* The @c row can be a tuple, or any type that can be iterated. Each
* item becomes a field in the row, in the same order as the columns you
* specified when creating the stream.
*
* If you don't already happen to have your fields in the form of a tuple or
* container, prefer @c write_values. It's faster and more convenient.
*/
template<typename Row> stream_to &operator<<(Row const &row)
{
write_row(row);
return *this;
}
/// Stream a `stream_from` straight into a `stream_to`.
/** This can be useful when copying between different databases. If the
* source and the destination are on the same database, you'll get better
* performance doing it all in a regular query.
*/
stream_to &operator<<(stream_from &);
/// Insert a row of data, given in the form of a @c std::tuple or container.
/** The @c row can be a tuple, or any type that can be iterated. Each
* item becomes a field in the row, in the same order as the columns you
* specified when creating the stream.
*
* The preferred way to insert a row is @c write_values.
*/
template<typename Row> void write_row(Row const &row)
{
fill_buffer(row);
write_buffer();
}
/// Insert values as a row.
/** This is the recommended way of inserting data. Pass your field values,
* of any convertible type.
*/
template<typename... Ts> void write_values(Ts const &...fields)
{
fill_buffer(fields...);
write_buffer();
}
private:
/// Stream a pre-quoted table name and columns list.
stream_to(
transaction_base &tx, std::string_view path, std::string_view columns);
bool m_finished = false;
/// Reusable buffer for a row. Saves doing an allocation for each row.
std::string m_buffer;
/// Reusable buffer for converting/escaping a field.
std::string m_field_buf;
/// Glyph scanner, for parsing the client encoding.
internal::glyph_scanner_func *m_scanner;
/// Write a row of raw text-format data into the destination table.
void write_raw_line(std::string_view);
/// Write a row of data from @c m_buffer into the destination table.
/** Resets the buffer for the next row.
*/
void write_buffer();
/// COPY encoding for a null field, plus subsequent separator.
static constexpr std::string_view null_field{"\\N\t"};
/// Estimate buffer space needed for a field which is always null.
template<typename T>
static std::enable_if_t<nullness<T>::always_null, std::size_t>
estimate_buffer(T const &)
{
return std::size(null_field);
}
/// Estimate buffer space needed for field f.
/** The estimate is not very precise. We don't actually know how much space
* we'll need once the escaping comes in.
*/
template<typename T>
static std::enable_if_t<not nullness<T>::always_null, std::size_t>
estimate_buffer(T const &field)
{
return is_null(field) ? std::size(null_field) : size_buffer(field);
}
/// Append escaped version of @c data to @c m_buffer, plus a tab.
void escape_field_to_buffer(std::string_view data);
/// Append string representation for @c f to @c m_buffer.
/** This is for the general case, where the field may contain a value.
*
* Also appends a tab. The tab is meant to be a separator, not a terminator,
* so if you write any fields at all, you'll end up with one tab too many
* at the end of the buffer.
*/
template<typename Field>
std::enable_if_t<not nullness<Field>::always_null>
append_to_buffer(Field const &f)
{
// We append each field, terminated by a tab. That will leave us with
// one tab too many, assuming we write any fields at all; we remove that
// at the end.
if (is_null(f))
{
// Easy. Append null and tab in one go.
m_buffer.append(null_field);
}
else
{
// Convert f into m_buffer.
using traits = string_traits<Field>;
auto const budget{estimate_buffer(f)};
auto const offset{std::size(m_buffer)};
if constexpr (std::is_arithmetic_v<Field>)
{
// Specially optimised for "safe" types, which never need any
// escaping. Convert straight into m_buffer.
// The budget we get from size_buffer() includes room for the trailing
// zero, which we must remove. But we're also inserting tabs between
// fields, so we re-purpose the extra byte for that.
auto const total{offset + budget};
m_buffer.resize(total);
auto const data{m_buffer.data()};
char *const end{traits::into_buf(data + offset, data + total, f)};
*(end - 1) = '\t';
// Shrink to fit. Keep the tab though.
m_buffer.resize(static_cast<std::size_t>(end - data));
}
else if constexpr (
std::is_same_v<Field, std::string> or
std::is_same_v<Field, std::string_view> or
std::is_same_v<Field, zview>)
{
// This string may need escaping.
m_field_buf.resize(budget);
escape_field_to_buffer(f);
}
else
{
// This field needs to be converted to a string, and after that,
// escaped as well.
m_field_buf.resize(budget);
auto const data{m_field_buf.data()};
escape_field_to_buffer(
traits::to_buf(data, data + std::size(m_field_buf), f));
}
}
}
/// Append string representation for a null field to @c m_buffer.
/** This special case is for types which are always null.
*
* Also appends a tab. The tab is meant to be a separator, not a terminator,
* so if you write any fields at all, you'll end up with one tab too many
* at the end of the buffer.
*/
template<typename Field>
std::enable_if_t<nullness<Field>::always_null>
append_to_buffer(Field const &)
{
m_buffer.append(null_field);
}
/// Write raw COPY line into @c m_buffer, based on a container of fields.
template<typename Container>
std::enable_if_t<not std::is_same_v<typename Container::value_type, char>>
fill_buffer(Container const &c)
{
// To avoid unnecessary allocations and deallocations, we run through c
// twice: once to determine how much buffer space we may need, and once to
// actually write it into the buffer.
std::size_t budget{0};
for (auto const &f : c) budget += estimate_buffer(f);
m_buffer.reserve(budget);
for (auto const &f : c) append_to_buffer(f);
}
/// Estimate how many buffer bytes we need to write tuple.
template<typename Tuple, std::size_t... indexes>
static std::size_t
budget_tuple(Tuple const &t, std::index_sequence<indexes...>)
{
return (estimate_buffer(std::get<indexes>(t)) + ...);
}
/// Write tuple of fields to @c m_buffer.
template<typename Tuple, std::size_t... indexes>
void append_tuple(Tuple const &t, std::index_sequence<indexes...>)
{
(append_to_buffer(std::get<indexes>(t)), ...);
}
/// Write raw COPY line into @c m_buffer, based on a tuple of fields.
template<typename... Elts> void fill_buffer(std::tuple<Elts...> const &t)
{
using indexes = std::make_index_sequence<sizeof...(Elts)>;
m_buffer.reserve(budget_tuple(t, indexes{}));
append_tuple(t, indexes{});
}
/// Write raw COPY line into @c m_buffer, based on varargs fields.
template<typename... Ts> void fill_buffer(const Ts &...fields)
{
(..., append_to_buffer(fields));
}
constexpr static std::string_view s_classname{"stream_to"};
};
template<typename Columns>
inline stream_to::stream_to(
transaction_base &tx, std::string_view table_name, Columns const &columns) :
stream_to{tx, table_name, std::begin(columns), std::end(columns)}
{}
template<typename Iter>
inline stream_to::stream_to(
transaction_base &tx, std::string_view table_name, Iter columns_begin,
Iter columns_end) :
stream_to{
tx,
tx.quote_name(
table_name,
separated_list(",", columns_begin, columns_end, [&tx](auto col) {
return tx.quote_name(*col);
}))}
{}
} // namespace pqxx
#endif