Source code for parsons.databases.db_sync

import logging

logger = logging.getLogger(__name__)


[docs]class DBSync: """ Sync tables between databases. Works with ``Postgres``, ``Redshift``, ``MySQL`` databases. `Args:` source_db: Database connection object A database object. destination_db: Database connection object A database object. chunk_size: int The number of rows per transaction copy when syncing a table. The default value is 100,000 rows. `Returns:` A DBSync object. """ def __init__(self, source_db, destination_db, chunk_size=100000): self.source_db = source_db self.dest_db = destination_db self.chunk_size = chunk_size
[docs] def table_sync_full(self, source_table, destination_table, if_exists='drop', **kwargs): """ Full sync of table from a source database to a destination database. This will wipe all data from the destination table. `Args:` source_table: str Full table path (e.g. ``my_schema.my_table``) destination_table: str Full table path (e.g. ``my_schema.my_table``) if_exists: str If destination table exists either ``drop`` or ``truncate``. Truncate is useful when there are dependent views associated with the table. **kwargs: args Optional copy arguments for destination database. `Returns:` ``None`` """ # Create the table objects source_tbl = self.source_db.table(source_table) destination_tbl = self.dest_db.table(destination_table) logger.info(f'Syncing full table data from {source_table} to {destination_table}') # Drop or truncate if the destination table exists if destination_tbl.exists: if if_exists == 'drop': destination_tbl.drop() elif if_exists == 'truncate': self._check_column_match(source_tbl, destination_tbl) destination_tbl.truncate() else: raise ValueError('Invalid if_exists argument. Must be drop or truncate.') # If the source table contains 0 rows, do not attempt to copy the table. if source_tbl.num_rows == 0: logger.info('Source table contains 0 rows') return None # Copy rows in chunks. copied_rows = 0 while copied_rows < source_tbl.num_rows: # Get a chunk rows = source_tbl.get_rows(offset=copied_rows, chunk_size=self.chunk_size) # Copy the chunk self.dest_db.copy(rows, destination_table, if_exists='append', **kwargs) # Update counter copied_rows += rows.num_rows logger.info(f'{source_table} synced: {copied_rows} total rows copied.') self._row_count_verify(source_tbl, destination_tbl)
[docs] def table_sync_incremental(self, source_table, destination_table, primary_key, distinct_check=True, **kwargs): """ Incremental sync of table from a source database to a destination database using an incremental primary key. `Args:` source_table: str Full table path (e.g. ``my_schema.my_table``) destination_table: str Full table path (e.g. ``my_schema.my_table``) if_exists: str If destination table exists either ``drop`` or ``truncate``. Truncate is useful when there are dependent views associated with the table. primary_key: str The name of the primary key. This must be the same for the source and destination table. distinct_check: bool Check that the source table primary key is distinct prior to running the sync. If it is not, an error will be raised. **kwargs: args Optional copy arguments for destination database. `Returns:` ``None`` """ # Create the table objects source_tbl = self.source_db.table(source_table) destination_tbl = self.dest_db.table(destination_table) # Check that the destination table exists. If it does not, then run a # full sync instead. if not destination_tbl.exists: self.table_sync_full(source_table, destination_table) # If the source table contains 0 rows, do not attempt to copy the table. if source_tbl.num_rows == 0: logger.info('Source table contains 0 rows') return None # Check that the source table primary key is distinct if distinct_check and not source_tbl.distinct_primary_key(primary_key): raise ValueError('{primary_key} is not distinct in source table.') # Get the max source table and destination table primary key source_max_pk = source_tbl.max_primary_key(primary_key) dest_max_pk = destination_tbl.max_primary_key(primary_key) # Check for a mismatch in row counts; if dest_max_pk is None, or destination is empty # and we don't have to worry about this check. if dest_max_pk is not None and dest_max_pk > source_max_pk: raise ValueError('Destination DB primary key greater than source DB primary key.') # Do not copied if row counts are equal. elif dest_max_pk == source_max_pk: logger.info('Tables are in sync.') return None else: # Get count of rows to be copied. if dest_max_pk is not None: new_row_count = source_tbl.get_new_rows_count(primary_key, dest_max_pk) else: new_row_count = source_tbl.num_rows logger.info(f'Found {new_row_count} new rows in source table.') copied_rows = 0 # Copy rows in chunks. while True: # Get a chunk rows = source_tbl.get_new_rows(primary_key=primary_key, cutoff_value=dest_max_pk, offset=copied_rows, chunk_size=self.chunk_size) row_count = rows.num_rows if rows else 0 if row_count == 0: break # Copy the chunk self.dest_db.copy(rows, destination_table, if_exists='append', **kwargs) # Update the counter copied_rows += row_count self._row_count_verify(source_tbl, destination_tbl) logger.info(f'{source_table} synced to {destination_table}.')
def _row_count_verify(self, source_table_obj, destination_table_obj): """ Ensure the the rows of the source table and the destination table match """ source_row_count = source_table_obj.num_rows dest_row_count = destination_table_obj.num_rows if source_row_count != dest_row_count: logger.warning((f'Table count mismatch. Source table contains {source_row_count}.', f' Destination table contains {dest_row_count}.')) return False logger.info('Source and destination table row counts match.') return True def _check_column_match(self, source_table_obj, destination_table_obj): """ Ensure that the columns from each table match """ if source_table_obj.columns != destination_table_obj.columns: raise ValueError("""Destination table columns do not match source table columns. Consider dropping destination table and running a full sync.""")