Databases

Google BigQuery

See Google for documentation.

MySQL

class parsons.MySQL(host=None, username=None, password=None, db=None, port=3306)[source]

Connect to a MySQL database.

Args:
username: str

Required if env variable MYSQL_USERNAME not populated

password: str

Required if env variable MYSQL_PASSWORD not populated

host: str

Required if env variable MYSQL_HOST not populated

db: str

Required if env variable MYSQL_DB not populated

port: int

Can be set by env variable MYSQL_PORT or argument.

connection()[source]

Generate a MySQL connection. The connection is set up as a python “context manager”, so it will be closed automatically (and all queries committed) when the connection goes out of scope.

When using the connection, make sure to put it in a with block (necessary for any context manager): with mysql.connection() as conn:

Returns:

MySQL connection object

query(sql, parameters=None)[source]

Execute a query against the database. Will return ``None``if the query returns zero rows.

To include python variables in your query, it is recommended to pass them as parameters, following the mysql style. Using the parameters argument ensures that values are escaped properly, and avoids SQL injection attacks.

Parameter Examples

# Note that the name contains a quote, which could break your query if not escaped
# properly.
name = "Beatrice O'Brady"
sql = "SELECT * FROM my_table WHERE name = %s"
mysql.query(sql, parameters=[name])
names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"]
placeholders = ', '.join('%s' for item in names)
sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})"
mysql.query(sql, parameters=names)
Args:
sql: str

A valid SQL statement

parameters: list

A list of python variables to be converted into SQL values in your query

Returns:
Parsons Table

See Parsons Table for output options.

query_with_connection(sql, connection, parameters=None, commit=True)[source]

Execute a query against the database, with an existing connection. Useful for batching queries together. Will return None if the query returns zero rows.

Args:
sql: str

A valid SQL statement

connection: obj

A connection object obtained from mysql.connection()

parameters: list

A list of python variables to be converted into SQL values in your query

commit: boolean

Whether to commit the transaction immediately. If False the transaction will be committed when the connection goes out of scope and is closed (or you can commit manually with connection.commit()).

Returns:
Parsons Table

See Parsons Table for output options.

copy(tbl, table_name, if_exists='fail', chunk_size=1000)[source]

Copy a Parsons Table to the database.

Note

This method utilizes extended inserts rather LOAD DATA INFILE since many MySQL Database configurations do not allow data files to be loaded. It results in a minor performance hit compared to LOAD DATA.

tbl: parsons.Table

A Parsons table object

table_name: str

The destination schema and table (e.g. my_schema.my_table)

if_exists: str

If the table already exists, either fail, append, drop or truncate the table.

chunk_size: int

The number of rows to insert per query.

table_exists(table_name)[source]

Check if a table or view exists in the database.

Args:
table_name: str

The table name

view: boolean

Check to see if a view exists by the same name

Returns:
boolean

True if the table exists and False if it does not.

Postgres

class parsons.Postgres(username=None, password=None, host=None, db=None, port=5432, timeout=10)[source]

A Postgres class to connect to database. Credentials can be passed from a .pgpass file stored in your home directory or with environmental variables.

Args:
username: str

Required if env variable PGUSER not populated

password: str

Required if env variable PGPASSWORD not populated

host: str

Required if env variable PGHOST not populated

db: str

Required if env variable PGDATABASE not populated

port: int

Required if env variable PGPORT not populated.

timeout: int

Seconds to timeout if connection not established.

copy(tbl, table_name, if_exists='fail')[source]

Copy a Parsons Table to Postgres.

tbl: parsons.Table

A Parsons table object

table_name: str

The destination schema and table (e.g. my_schema.my_table)

if_exists: str

If the table already exists, either fail, append, drop or truncate the table.

connection()

Generate a Postgres connection. The connection is set up as a python “context manager”, so it will be closed automatically (and all queries committed) when the connection goes out of scope.

When using the connection, make sure to put it in a with block (necessary for any context manager): with pg.connection() as conn:

Returns:

Psycopg2 connection object

query(sql, parameters=None)

Execute a query against the database. Will return ``None``if the query returns zero rows.

To include python variables in your query, it is recommended to pass them as parameters, following the psycopg style. Using the parameters argument ensures that values are escaped properly, and avoids SQL injection attacks.

Parameter Examples

# Note that the name contains a quote, which could break your query if not escaped
# properly.
name = "Beatrice O'Brady"
sql = "SELECT * FROM my_table WHERE name = %s"
rs.query(sql, parameters=[name])
names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"]
placeholders = ', '.join('%s' for item in names)
sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})"
rs.query(sql, parameters=names)
Args:
sql: str

A valid SQL statement

parameters: list

A list of python variables to be converted into SQL values in your query

Returns:
Parsons Table

See Parsons Table for output options.

query_with_connection(sql, connection, parameters=None, commit=True)

Execute a query against the database, with an existing connection. Useful for batching queries together. Will return None if the query returns zero rows.

Args:
sql: str

A valid SQL statement

connection: obj

A connection object obtained from redshift.connection()

parameters: list

A list of python variables to be converted into SQL values in your query

commit: boolean

Whether to commit the transaction immediately. If False the transaction will be committed when the connection goes out of scope and is closed (or you can commit manually with connection.commit()).

Returns:
Parsons Table

See Parsons Table for output options.

table_exists(table_name, view=True)

Check if a table or view exists in the database.

Args:
table_name: str

The table name and schema (e.g. myschema.mytable).

view: boolean

Check to see if a view exists by the same name

Returns:
boolean

True if the table exists and False if it does not.

Redshift

See Amazon Web Services section for documentation.

Database Sync

Sync tables between two databases with just a few lines of code. Currently supported database types are:

  • Google BigQuery

  • MySQL

  • Postgres

  • Redshift

Examples

Full Sync Of Tables

Copy all data from a source table to a destination table.

# Create source and destination database objects
source_rs = Redshift()
destination_rs = Postgres()

# Create db sync object and run sync.
db_sync = DBSync(source_rs, destination_rs) # Create DBSync Object
db_sync.table_sync_full('parsons.source_data', 'parsons.destination_data')

Incremental Sync of Tables

Copy just new data in the table. Utilize this method for tables with distinct primary keys.

# Create source and destination database objects
source_rs = Postgres()
destination_rs = Postgres()

# Create db sync object and run sync.
db_sync = DBSync(source_pg, destination_pg) # Create DBSync Object
db_sync.table_sync_incremental('parsons.source_data', 'parsons.destination_data', 'myid')

API

class parsons.DBSync(source_db, destination_db, chunk_size=100000)[source]

Sync tables between databases. Works with Postgres, Redshift, MySQL databases.

Args:
source_db: Database connection object

A database object.

destination_db: Database connection object

A database object.

chunk_size: int

The number of rows per transaction copy when syncing a table. The default value is 100,000 rows.

Returns:

A DBSync object.

table_sync_full(source_table, destination_table, if_exists='drop', **kwargs)[source]

Full sync of table from a source database to a destination database. This will wipe all data from the destination table.

Args:
source_table: str

Full table path (e.g. my_schema.my_table)

destination_table: str

Full table path (e.g. my_schema.my_table)

if_exists: str

If destination table exists either drop or truncate. Truncate is useful when there are dependent views associated with the table.

**kwargs: args

Optional copy arguments for destination database.

Returns:

None

table_sync_incremental(source_table, destination_table, primary_key, distinct_check=True, **kwargs)[source]

Incremental sync of table from a source database to a destination database using an incremental primary key.

Args:
source_table: str

Full table path (e.g. my_schema.my_table)

destination_table: str

Full table path (e.g. my_schema.my_table)

if_exists: str

If destination table exists either drop or truncate. Truncate is useful when there are dependent views associated with the table.

primary_key: str

The name of the primary key. This must be the same for the source and destination table.

distinct_check: bool

Check that the source table primary key is distinct prior to running the sync. If it is not, an error will be raised.

**kwargs: args

Optional copy arguments for destination database.

Returns:

None