Postgres

Postgres is popular open source SQL database dialect. The Parsons class leverages the psycopg2 python package.

Quickstart

Instantiate Postgres from environmental variables
from parsons import Postgres
pg = Postgres()
Instantiate Postgres from passed variables
from parsons import Postgres
pg = Postgres(username='me', password='secret', host='mydb.com', db='dev', port=3306)
Instantiate Postgres from a ~/.pgpass file
from parsons import Postgres
pg = Postgres()
Query database
tbl = pg.query('select * from my_schema.secret_sauce')
Copy data to database
tbl = Table.from_csv('my_file.csv') # Load from a CSV or other source.
pg.copy(tbl, 'my_schema.winning_formula')

API

class parsons.databases.postgres.postgres.Postgres(username=None, password=None, host=None, db=None, port=None, timeout=10)[source]

A Postgres class to connect to database. Credentials can be passed from a .pgpass file stored in your home directory or with environmental variables.

Parameters:
  • username – str Required if env variable PGUSER not populated

  • password – str Required if env variable PGPASSWORD not populated

  • host – str Required if env variable PGHOST not populated

  • db – str Required if env variable PGDATABASE not populated

  • port – int If omitted or None, uses PGPORT when set, otherwise 5432. If passed (including 5432), the argument takes precedence over PGPORT.

  • timeout – int Seconds to timeout if connection not established.

copy(tbl: Table, table_name: str, if_exists: Literal['fail', 'append', 'drop', 'truncate'] = 'fail', strict_length: bool = False)[source]

Copy a Table to Postgres.

Parameters:
  • tbl (Table) – Table A Parsons table object

  • table_name (str) – str The destination schema and table (e.g. my_schema.my_table)

  • if_exists (Literal['fail', 'append', 'drop', 'truncate']) – str If the table already exists, either fail, append, drop or truncate the table.

  • strict_length (bool) – bool If the database table needs to be created, strict_length determines whether the created table’s column sizes will be sized to exactly fit the current data, or if their size will be rounded up to account for future values being larger then the current dataset. Defaults to False.

connection()

Generate a Postgres connection. The connection is set up as a python “context manager”, so it will be closed automatically (and all queries committed) when the connection goes out of scope.

When using the connection, make sure to put it in a with block (necessary for any context manager): with pg.connection() as conn:

Yields:

Psycopg2 connection object

create_table(table_object, table_name)

Create a table based on table object data.

detect_data_type(value, cmp_type=None)

Detect the higher of value’s type cmp_type.

  1. check if it’s a string

  2. check if it’s a number

    1. check if it’s a float

    2. check if it’s an int

Parameters:
  • value – str The value to inspect.

  • cmp_type – str The string representation of a type to compare with the type of value.

Returns:

str

String representation of the higher of the two types.

format_column(col: str, index: int | str = '', replace_chars: dict | None = None, col_prefix: str = '_') str

Format the column to meet database contraints.

Formats the columns as follows: 1. Coverts to lowercase (if case insensitive) 2. Strips leading and trailing whitespace 3. Replaces invalid characters 4. Renames if in reserved words

Parameters:
  • col (str) – The column to format.

  • index (int | str) – The index of the column. Used if the column is empty.

  • replace_chars (dict | None) – A dictionary of invalid characters and their replacements. If None uses {” “: “_”}

  • col_prefix (str) – The prefix to use when the column is empty or starts with an invalid character.

Returns:

The formatted column.

Return type:

str

format_columns(cols, **kwargs)

Format the columns to meet database contraints.

This method relies on format_column to handle most changes. It only handles duplicated columns. Options to format_column can be passed through kwargs.

Parameters:
  • cols – list The columns to format.

  • **kwargs – dicts Keyword arguments to pass to format_column.

Returns:

list

The formatted columns.

generate_alchemy_url()

Generate a SQL Alchemy engine https://docs.sqlalchemy.org/en/14/core/engines.html#

generate_engine()

Generate a SQL Alchemy engine.

get_bigger_int(int1, int2)

Return the bigger of the two ints.

Parameters:
  • int1 – str The string representation if an int type.

  • int2 – str The string representation if an int type.

Returns:

str

A string representation of the higher of the two int types.

get_table_object(table_name)

Get a SQL Alchemy table object.

is_valid_sql_num(val)

Check whether val is a valid sql number.

Parameters:

val – any The values to check.

Returns:

bool

Whether or not the value is a valid sql number.

query(sql: str, parameters: list | None = None) Table | None

Execute a query against the database. Will return None if the query returns zero rows.

To include python variables in your query, it is recommended to pass them as parameters, following the psycopg style. Using the parameters argument ensures that values are escaped properly, and avoids SQL injection attacks.

Parameter Examples

# Note that the name contains a quote, which could break your query if not escaped
# properly.
name = "Beatrice O'Brady"
sql = "SELECT * FROM my_table WHERE name = %s"
rs.query(sql, parameters=[name])
names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"]
placeholders = ', '.join('%s' for item in names)
sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})"
rs.query(sql, parameters=names)
Parameters:
  • sql (str) – str A valid SQL statement

  • parameters (list | None) – list A list of python variables to be converted into SQL values in your query

Returns:

Table

See Table for output options.

Return type:

Table | None

query_with_connection(sql, connection, parameters=None, commit=True)

Execute a query against the database, with an existing connection. Useful for batching queries together. Will return None if the query returns zero rows.

Parameters:
  • sql – str A valid SQL statement

  • connection – obj A connection object obtained from redshift.connection()

  • parameters – list A list of python variables to be converted into SQL values in your query

  • commit – boolean Whether to commit the transaction immediately. If False the transaction will be committed when the connection goes out of scope and is closed (or you can commit manually with connection.commit()).

Returns:

Table

See Table for output options.

static split_table_name(full_table_name: str) tuple[str, str] | None

Parse the schema and table name.

Returns:

tuple[str, str]

Parameters:

full_table_name (str)

Return type:

tuple[str, str] | None

table_exists(table_name: str, view: bool = True) bool

Check if a table or view exists in the database.

Parameters:
  • table_name (str) – str The table name and schema (e.g. myschema.mytable).

  • view (bool) – boolean Check to see if a view exists by the same name. Defaults to True.

Returns:

boolean

True if the table exists and False if it does not.

Return type:

bool