/* Definition of the pqxx::stream_to class. * * pqxx::stream_to enables optimized batch updates to a database table. * * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/stream_to.hxx instead. * * Copyright (c) 2000-2022, Jeroen T. Vermeulen. * * See COPYING for copyright license. If you did not receive a file called * COPYING with this source code, please notify the distributor of this * mistake, or contact the author. */ #ifndef PQXX_H_STREAM_TO #define PQXX_H_STREAM_TO #if !defined(PQXX_HEADER_PRE) # error "Include libpqxx headers as , not ." #endif #include "pqxx/separated_list.hxx" #include "pqxx/transaction_base.hxx" namespace pqxx { /// Efficiently write data directly to a database table. /** If you wish to insert rows of data into a table, you can compose INSERT * statements and execute them. But it's slow and tedious, and you need to * worry about quoting and escaping the data. * * If you're just inserting a single row, it probably won't matter much. You * can use prepared or parameterised statements to take care of the escaping * for you. But if you're inserting large numbers of rows you will want * something better. * * Inserting rows one by one using INSERT statements involves a lot of * pointless overhead, especially when you are working with a remote database * server over the network. You may end up sending each row over the network * as a separate query, and waiting for a reply. Do it "in bulk" using * `stream_to`, and you may find that it goes many times faster. Sometimes * you gain orders of magnitude in speed. * * Here's how it works: you create a `stream_to` stream to start writing to * your table. You will probably want to specify the columns. Then, you * feed your data into the stream one row at a time. And finally, you call the * stream's @ref complete function to tell it to finalise the operation, wait * for completion, and check for errors. * * (You _must_ complete the stream before committing or aborting the * transaction. The connection is in a special state while the stream is * active, where it can't process commands, and can't commit or abort a * transaction.) * * So how do you feed a row of data into the stream? There's several ways, but * the preferred one is to call its @ref write_values. Pass the field values * as arguments. Doesn't matter what type they are, as long as libpqxx knows * how to convert them to PostgreSQL's text format: `int`, `std::string` or * `std:string_view`, `float` and `double`, `bool`... lots of basic types * are supported. If some of the values are null, feel free to use * `std::optional`, `std::shared_ptr`, or `std::unique_ptr`. * * The arguments' types don't even have to match the fields' SQL types. If you * want to insert an `int` into a `DECIMAL` column, that's your choice -- it * will produce a `DECIMAL` value which happens to be integral. Insert a * `float` into a `VARCHAR` column? That's fine, you'll get a string whose * contents happen to read like a number. And so on. You can even insert * different types of value in the same column on different rows. If you have * a code path where a particular field is always null, just insert `nullptr`. * * There is another way to insert rows: the `<<` ("shift-left") operator. * It's not as fast and it doesn't support variable arguments: each row must be * either a `std::tuple` or something iterable, such as a `std::vector`, or * anything else with a `begin()` and `end()`. * * @warning While a stream is active, you cannot execute queries, open a * pipeline, etc. on the same transaction. A transaction can have at most one * object of a type derived from @ref pqxx::transaction_focus active on it at a * time. */ class PQXX_LIBEXPORT stream_to : transaction_focus { public: /// Stream data to a pre-quoted table and columns. /** This factory can be useful when it's not convenient to provide the * columns list in the form of a `std::initializer_list`, or when the list * of columns is simply not known at compile time. * * Also use this if you need to create multiple streams using the same table * path and/or columns list, and you want to save a bit of work on composing * the internal SQL statement for starting the stream. It lets you compose * the string representations for the table path and the columns list, so you * can compute these once and then re-use them later. * * @param tx The transaction within which the stream will operate. * @param path Name or path for the table upon which the stream will * operate. If any part of the table path may contain special * characters or be case-sensitive, quote the path using * pqxx::connection::quote_table(). * @param columns Columns to which the stream will write. They should be * comma-separated and, if needed, quoted. You can produce the string * using pqxx::connection::quote_columns(). If you omit this argument, * the stream will write all columns in the table, in schema order. */ static stream_to raw_table( transaction_base &tx, std::string_view path, std::string_view columns = "") { return {tx, path, columns}; } /// Create a `stream_to` writing to a named table and columns. /** Use this to stream data to a table, where the list of columns is known at * compile time. * * @param tx The transaction within which the stream will operate. * @param path A @ref table_path designating the target table. * @param columns Optionally, the columns to which the stream should write. * If you do not pass this, the stream will write to all columns in the * table, in schema order. */ static stream_to table( transaction_base &tx, table_path path, std::initializer_list columns = {}) { auto const &conn{tx.conn()}; return raw_table(tx, conn.quote_table(path), conn.quote_columns(columns)); } #if defined(PQXX_HAVE_CONCEPTS) /// Create a `stream_to` writing to a named table and columns. /** Use this version to stream data to a table, when the list of columns is * not known at compile time. * * @param tx The transaction within which the stream will operate. * @param path A @ref table_path designating the target table. * @param columns The columns to which the stream should write. */ template static stream_to table(transaction_base &tx, table_path path, COLUMNS const &columns) { auto const &conn{tx.conn()}; return stream_to::raw_table( tx, conn.quote_table(path), tx.conn().quote_columns(columns)); } /// Create a `stream_to` writing to a named table and columns. /** Use this version to stream data to a table, when the list of columns is * not known at compile time. * * @param tx The transaction within which the stream will operate. * @param path A @ref table_path designating the target table. * @param columns The columns to which the stream should write. */ template static stream_to table(transaction_base &tx, std::string_view path, COLUMNS const &columns) { return stream_to::raw_table(tx, path, tx.conn().quote_columns(columns)); } #endif // PQXX_HAVE_CONCEPTS /// Create a stream, without specifying columns. /** @deprecated Use @ref table or @ref raw_table as a factory. * * Fields will be inserted in whatever order the columns have in the * database. * * You'll probably want to specify the columns, so that the mapping between * your data fields and the table is explicit in your code, and not hidden * in an "implicit contract" between your code and your schema. */ [[deprecated("Use table() or raw_table() factory.")]] stream_to( transaction_base &tx, std::string_view table_name) : stream_to{tx, table_name, ""sv} {} /// Create a stream, specifying column names as a container of strings. /** @deprecated Use @ref table or @ref raw_table as a factory. */ template [[deprecated("Use table() or raw_table() factory.")]] stream_to( transaction_base &, std::string_view table_name, Columns const &columns); /// Create a stream, specifying column names as a sequence of strings. /** @deprecated Use @ref table or @ref raw_table as a factory. */ template [[deprecated("Use table() or raw_table() factory.")]] stream_to( transaction_base &, std::string_view table_name, Iter columns_begin, Iter columns_end); ~stream_to() noexcept; /// Does this stream still need to @ref complete()? [[nodiscard]] constexpr operator bool() const noexcept { return not m_finished; } /// Has this stream been through its concluding @c complete()? [[nodiscard]] constexpr bool operator!() const noexcept { return m_finished; } /// Complete the operation, and check for errors. /** Always call this to close the stream in an orderly fashion, even after * an error. (In the case of an error, abort the transaction afterwards.) * * The only circumstance where it's safe to skip this is after an error, if * you're discarding the entire connection. */ void complete(); /// Insert a row of data. /** Returns a reference to the stream, so you can chain the calls. * * The @c row can be a tuple, or any type that can be iterated. Each * item becomes a field in the row, in the same order as the columns you * specified when creating the stream. * * If you don't already happen to have your fields in the form of a tuple or * container, prefer @c write_values. It's faster and more convenient. */ template stream_to &operator<<(Row const &row) { write_row(row); return *this; } /// Stream a `stream_from` straight into a `stream_to`. /** This can be useful when copying between different databases. If the * source and the destination are on the same database, you'll get better * performance doing it all in a regular query. */ stream_to &operator<<(stream_from &); /// Insert a row of data, given in the form of a @c std::tuple or container. /** The @c row can be a tuple, or any type that can be iterated. Each * item becomes a field in the row, in the same order as the columns you * specified when creating the stream. * * The preferred way to insert a row is @c write_values. */ template void write_row(Row const &row) { fill_buffer(row); write_buffer(); } /// Insert values as a row. /** This is the recommended way of inserting data. Pass your field values, * of any convertible type. */ template void write_values(Ts const &...fields) { fill_buffer(fields...); write_buffer(); } private: /// Stream a pre-quoted table name and columns list. stream_to( transaction_base &tx, std::string_view path, std::string_view columns); bool m_finished = false; /// Reusable buffer for a row. Saves doing an allocation for each row. std::string m_buffer; /// Reusable buffer for converting/escaping a field. std::string m_field_buf; /// Glyph scanner, for parsing the client encoding. internal::glyph_scanner_func *m_scanner; /// Write a row of raw text-format data into the destination table. void write_raw_line(std::string_view); /// Write a row of data from @c m_buffer into the destination table. /** Resets the buffer for the next row. */ void write_buffer(); /// COPY encoding for a null field, plus subsequent separator. static constexpr std::string_view null_field{"\\N\t"}; /// Estimate buffer space needed for a field which is always null. template static std::enable_if_t::always_null, std::size_t> estimate_buffer(T const &) { return std::size(null_field); } /// Estimate buffer space needed for field f. /** The estimate is not very precise. We don't actually know how much space * we'll need once the escaping comes in. */ template static std::enable_if_t::always_null, std::size_t> estimate_buffer(T const &field) { return is_null(field) ? std::size(null_field) : size_buffer(field); } /// Append escaped version of @c data to @c m_buffer, plus a tab. void escape_field_to_buffer(std::string_view data); /// Append string representation for @c f to @c m_buffer. /** This is for the general case, where the field may contain a value. * * Also appends a tab. The tab is meant to be a separator, not a terminator, * so if you write any fields at all, you'll end up with one tab too many * at the end of the buffer. */ template std::enable_if_t::always_null> append_to_buffer(Field const &f) { // We append each field, terminated by a tab. That will leave us with // one tab too many, assuming we write any fields at all; we remove that // at the end. if (is_null(f)) { // Easy. Append null and tab in one go. m_buffer.append(null_field); } else { // Convert f into m_buffer. using traits = string_traits; auto const budget{estimate_buffer(f)}; auto const offset{std::size(m_buffer)}; if constexpr (std::is_arithmetic_v) { // Specially optimised for "safe" types, which never need any // escaping. Convert straight into m_buffer. // The budget we get from size_buffer() includes room for the trailing // zero, which we must remove. But we're also inserting tabs between // fields, so we re-purpose the extra byte for that. auto const total{offset + budget}; m_buffer.resize(total); auto const data{m_buffer.data()}; char *const end{traits::into_buf(data + offset, data + total, f)}; *(end - 1) = '\t'; // Shrink to fit. Keep the tab though. m_buffer.resize(static_cast(end - data)); } else if constexpr ( std::is_same_v or std::is_same_v or std::is_same_v) { // This string may need escaping. m_field_buf.resize(budget); escape_field_to_buffer(f); } else { // This field needs to be converted to a string, and after that, // escaped as well. m_field_buf.resize(budget); auto const data{m_field_buf.data()}; escape_field_to_buffer( traits::to_buf(data, data + std::size(m_field_buf), f)); } } } /// Append string representation for a null field to @c m_buffer. /** This special case is for types which are always null. * * Also appends a tab. The tab is meant to be a separator, not a terminator, * so if you write any fields at all, you'll end up with one tab too many * at the end of the buffer. */ template std::enable_if_t::always_null> append_to_buffer(Field const &) { m_buffer.append(null_field); } /// Write raw COPY line into @c m_buffer, based on a container of fields. template std::enable_if_t> fill_buffer(Container const &c) { // To avoid unnecessary allocations and deallocations, we run through c // twice: once to determine how much buffer space we may need, and once to // actually write it into the buffer. std::size_t budget{0}; for (auto const &f : c) budget += estimate_buffer(f); m_buffer.reserve(budget); for (auto const &f : c) append_to_buffer(f); } /// Estimate how many buffer bytes we need to write tuple. template static std::size_t budget_tuple(Tuple const &t, std::index_sequence) { return (estimate_buffer(std::get(t)) + ...); } /// Write tuple of fields to @c m_buffer. template void append_tuple(Tuple const &t, std::index_sequence) { (append_to_buffer(std::get(t)), ...); } /// Write raw COPY line into @c m_buffer, based on a tuple of fields. template void fill_buffer(std::tuple const &t) { using indexes = std::make_index_sequence; m_buffer.reserve(budget_tuple(t, indexes{})); append_tuple(t, indexes{}); } /// Write raw COPY line into @c m_buffer, based on varargs fields. template void fill_buffer(const Ts &...fields) { (..., append_to_buffer(fields)); } constexpr static std::string_view s_classname{"stream_to"}; }; template inline stream_to::stream_to( transaction_base &tx, std::string_view table_name, Columns const &columns) : stream_to{tx, table_name, std::begin(columns), std::end(columns)} {} template inline stream_to::stream_to( transaction_base &tx, std::string_view table_name, Iter columns_begin, Iter columns_end) : stream_to{ tx, tx.quote_name( table_name, separated_list(",", columns_begin, columns_end, [&tx](auto col) { return tx.quote_name(*col); }))} {} } // namespace pqxx #endif