Database Sync

The database sync framework allows tables between two databases with just a few lines of code. Currently supported database types are:

The DBSync class is not a connector, but rather a class that joins in database classes and moves data seamlessly between them.

Quick Start

Full Sync Of Tables

Copy all data from a source table to a destination table.

# Create source and destination database objects
source_rs = Redshift()
destination_rs = Postgres()

# Create db sync object and run sync.
db_sync = DBSync(source_rs, destination_rs) # Create DBSync Object
db_sync.table_sync_full('parsons.source_data', 'parsons.destination_data')

Incremental Sync of Tables

Copy just new data in the table. Utilize this method for tables with distinct primary keys.

# Create source and destination database objects
source_rs = Postgres()
destination_rs = Postgres()

# Create db sync object and run sync.
db_sync = DBSync(source_pg, destination_pg) # Create DBSync Object
db_sync.table_sync_incremental('parsons.source_data', 'parsons.destination_data', 'myid')

API

class parsons.DBSync(source_db, destination_db, read_chunk_size=100000, write_chunk_size=None, retries=0)[source]

Sync tables between databases. Works with Postgres, Redshift, MySQL databases.

Args:
source_db: Database connection object

A database object.

destination_db: Database connection object

A database object.

read_chunk_size: int

The number of rows to read from the source at a time when syncing a table. The default value is 100,000 rows.

write_chunk_size: int

The number of rows to batch up before writing out to the destination. This value defaults to whatever the read_chunk_size is.

retries: int

The number of times to retry if there is an error processing a chunk of data. The default value is 0.

Returns:

A DBSync object.

table_sync_full(source_table, destination_table, if_exists='drop', order_by=None, verify_row_count=True, **kwargs)[source]

Full sync of table from a source database to a destination database. This will wipe all data from the destination table.

Args:
source_table: str

Full table path (e.g. my_schema.my_table)

destination_table: str

Full table path (e.g. my_schema.my_table)

if_exists: str

If destination table exists either drop, truncate, or drop_if_needed. Truncate is useful when there are dependent views associated with the table. Drop if needed defaults to truncate, but if an error occurs (because a data type or length has changed), it will instead drop.

order_by: str

Name of the column to order rows by to ensure stable sorting of results across chunks.

verify_row_count: bool

Whether or not to verify the count of rows in the source and destination table are the same at the end of the sync.

**kwargs: args

Optional copy arguments for destination database.

Returns:

None

table_sync_incremental(source_table, destination_table, primary_key, distinct_check=True, verify_row_count=True, **kwargs)[source]

Incremental sync of table from a source database to a destination database using an incremental primary key.

Args:
source_table: str

Full table path (e.g. my_schema.my_table)

destination_table: str

Full table path (e.g. my_schema.my_table)

primary_key: str

The name of the primary key. This must be the same for the source and destination table.

distinct_check: bool

Check that the source table primary key is distinct prior to running the sync. If it is not, an error will be raised.

verify_row_count: bool

Whether or not to verify the count of rows in the source and destination table are the same at the end of the sync.

**kwargs: args

Optional copy arguments for destination database.

Returns:

None

copy_rows(source_table_name, destination_table_name, cutoff, order_by, **kwargs)[source]

Copy the rows from the source to the destination.

Args:
source_table_name: str

Full table path (e.g. my_schema.my_table)

destination_table_name: str

Full table path (e.g. my_schema.my_table)

cutoff:

Start value to use as a minimum for incremental updates.

order_by:

Column to use to order the data to ensure a stable sort.

**kwargs: args

Optional copy arguments for destination database.

Returns:

None

create_table(source_table, destination_table)[source]

Create the empty table in the destination database based on the source database schema structure. This method utilizes the Alchemy subclass.