Moving a lot of data between databases

Using PostgreSQL's COPY command and a custom stream interface to load data from a MySQL query result

Posted by Agustín Bartó 2 years, 6 months ago Comments

Motivation

During one of our latest projects, we had to do some Data Warehousing for a client with a fairly large dataset. Everything is stored in a MySQL cluster, and given the sensitive nature of the data we were given only partial access to views based on the actual data.

We decided to implement the warehouse using PostgreSQL (we also had to build a Django site based on it, so it was the most natural solution), and initially everything went fine, but as the dataset grew, moving the data from MySQL to PostgreSQL proved challenging and time consuming.

In the past we have had to handle even larger datasets, but none coming from a database, so we had to adapt our solution to the new problem. We want to share some of the components of the solution we found with the community.

RowsTextIO

One of the fastest ways to load data into a PostgreSQL table is to use the COPY SQL command that copies data from a file or from a stream directly into the table. This command is often used when loading large amounts of data from a foreign source (usually CSV files) into the database. This command is also available on psycopg (PostgreSQL’s python driver), as the copy_from method on the cursor class.

rowstextio is a read-only unicode character and line based interface to stream I/O from the result of a database query. This stream can be given as a parameter to psycopg’s cursor copy_from method to load the data into the target table.

Usage

The following session shows the typical use case for the package.

>>>> import psycopg2
>>>> import mysql.connector
>>>> source_connection = mysql.connector.connect(**source_connection_settings)
>>>> target_connection = psycopg2.connect(**target_connection_settings)
>>>> from rowstextio import RowsTextIO
>>>> source_cursor = source_connection.cursor()
>>>> target_cursor = target_connection.cursor()
>>>> f = RowsTextIO(source_cursor, 'SELECT * FROM source_table WHERE id <> %(id)s', {'id': 42})
>>>> target_cursor.copy_expert('COPY target_table FROM STDIN CSV', f)
>>>> target_cursor.close()
>>>> source_cursor.close()

Assuming that the target table schema is compatible with the rows resulting from the query, the data should be loaded by now.

How does it work?

It works by requesting a fixed amount of rows from the source table to populate a buffer, and then reads from that buffer at the client’s request:

def read(self, n=None):
    read_buffer = StringIO()

    if n is not None:
        # Read a fixed amount of bytes

        left_to_read = n
        while left_to_read > 0:
            # Keep reading from the buffer

            read_from_buffer = self._buffer.read(left_to_read)

            if len(read_from_buffer) > 0:
                read_buffer.write(unicode(read_from_buffer))
            elif len(read_from_buffer) < left_to_read:
                # We read less than the remaining bytes. Fetch more rows.

                self._fetch_rows()
                self._write_rows_onto_buffer()

                if len(self._buffer.getvalue()) == 0L:
                    # There are no more rows, break the loop
                    break

            left_to_read -= len(read_from_buffer)
    else:
        # Read all the rows

        while True:
            read_from_buffer = self._buffer.read()

            if len(read_from_buffer) > 0:
                read_buffer.write(read_from_buffer)
            else:
                # We emptied the buffer. Fetch more rows.

                self._fetch_rows()
                self._write_rows_onto_buffer()

                if len(self._buffer.getvalue()) == 0L:
                    # There are no more rows, break the loop
                    break

    read_result = read_buffer.getvalue()
    read_buffer.close()

    return read_result

Conclusions

This solution gave us the flexibility to load huge amounts of data from complex queries and speed up our ETL process.

As usual, any comments or suggestions are welcomed. We haven’t tried this with other databases, but we think it might be possible to make it work with any interface that takes a text stream as input. We’re interested to know if you managed to use it in a different environment.


Previous / Next posts


Comments