Amazon Web Services

Parsons provides utility functions and/or connectors for three different AWS services.

See the documentation for each service for more details.

Lambda

Overview

Parsons’ distribute_task function allows you to distribute process rows of a table across multiple AWS Lambda invocations.

If you are running the processing of a table inside AWS Lambda, then you are limited by how many rows can be processed within the Lambda’s time limit (at time-of-writing, maximum 15min).

Based on experience and some napkin math, with the same data that would allow 1000 rows to be processed inside a single AWS Lambda instance, this method allows 10 MILLION rows to be processed.

Rather than converting the table to SQS or other options, the fastest way is to upload the table to S3, and then invoke multiple Lambda sub-invocations, each of which can be sent a byte-range of the data in the S3 CSV file for which to process.

Using this method requires some setup. You have three tasks:

  1. Define the function to process rows, the first argument, must take your table’s data (though only a subset of rows will be passed) (e.g. def task_for_distribution(table, **kwargs):)

  2. Where you would have run task_for_distribution(my_table, **kwargs) instead call distribute_task(my_table, task_for_distribution, func_kwargs=kwargs) (either setting env var S3_TEMP_BUCKET or passing a bucket= parameter)

  3. Setup your Lambda handler to include parsons.aws.event_command() (or run and deploy your lambda with Zappa)

To test locally, include the argument storage="local", which will test the distribute_task function, but run the task sequentially and in local memory.

QuickStart

A minimalistic example Lambda handler might look something like this:

from parsons.aws import event_command, distribute_task

def process_table(table, foo, bar=None):
    for row in table:
        do_sloooooow_thing(row, foo, bar)

def handler(event, context):
    ## ADD THESE TWO LINES TO TOP OF HANDLER:
    if event_command(event, context):
        return
    table = FakeDatasource.load_to_table(username='123', password='abc')
    # table is so big that running
    #   process_table(table, foo=789, bar='baz') would timeout
    # so instead we:
    distribute_task(table, process_table,
                    bucket='my-temp-s3-bucket',
                    func_kwargs={'foo': 789, 'bar': 'baz'})

API

parsons.aws.distribute_task(table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage='s3')[source]

Distribute processing rows in a table across multiple AWS Lambda invocations.

Args:
table: Parsons Table

Table of data you wish to distribute processing across Lambda invocations of func_to_run argument.

func_to_run: function

The function you want to run whose first argument will be a subset of table

bucket: str

The bucket name to use for s3 upload to process the whole table Not required if you set environment variable S3_TEMP_BUCKET

func_kwargs: dict

If the function has other arguments to pass along with table then provide them as a dict here. They must all be JSON-able.

func_class: class

If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed ActionKit.bulk_upload_table, then you would pass ActionKit here.

func_class_kwargs: dict

If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed ActionKit.bulk_upload_table as the function, then you would pass {‘domain’: …, ‘username’: … etc} here. This must all be JSON-able data.

catch: bool

Lambda will retry running an event three times if there’s an exception – if you want to prevent this, set catch=True and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string “Distribute Error” This might be important if row-actions are not idempotent and your own function might fail causing repeats.

group_count: int

Set this to how many rows to process with each Lambda invocation (Default: 100)

storage: str

Debugging option: Defaults to “s3”. To test distribution locally without s3, set to “local”.

Returns:

Debug information – do not rely on the output, as it will change depending on how this method is invoked.

parsons.aws.event_command(event, context)[source]

Minimal shim to add to the top lambda handler function to enable distributed tasks

The rest of this library is compatible with zappa.async library. If you have deployed your app with Zappa, then you do NOT need to add this shim.

S3

Overview

The S3 class allows interaction with Amazon Web Service’s object storage service to store and access data objects. It is a wrapper around the AWS SDK boto3. It provides methods to upload and download files from S3 as well as manipulate buckets.

Note

Authentication

Access to S3 is controlled through AWS Identity and Access Management (IAM) users in the AWS Managerment Console . Users can be granted granular access to AWS resources, including S3. IAM users are provisioned keys, which are required to access the S3 class.

QuickStart

S3 credentials can be passed as environmental variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY), stored in an AWS CLI file ~/.aws/credentials, or passed as keyword arguments.

from parsons import S3

# First approach: Pass API credentials via environmental variables or an AWS CLI file
s3 = S3()

# Second approach: Pass API credentials as arguments
s3 = S3(aws_access_key_id='MY_KEY', aws_secret_access_key='MY_SECRET')

# Put an arbitrary file in an S3 bucket
with open('winning_formula.csv') as w:
    s3.put_file('my_bucket', 'winning.csv, w)

# Put a Parsons Table as a CSV using convenience method.
tbl = Table.from_csv('winning_formula.csv')
tbl.to_s3_csv('my_bucket', 'winning.csv')

# Download a csv file and convert to a table
f = s3.get_file('my_bucket', 'my_dir/my_file.csv')
tbl = Table(f)

# List buckets that you have access to
s3.list_buckets()

# List the keys in a bucket
s3.list_keys('my_bucket')

API

Redshift

Overview

The Redshift class allows you to interact with an Amazon Redshift relational database. The connector utilizes the psycopg2 Python package under the hood. The core methods focus on input, output and querying of the database.

In addition to the core API integration provided by the Redshift class, Parsons also includes utility functions for managing schemas and tables. See Table and View API and Schema API for more information.

Note

S3 Credentials

Redshift only allows data to be copied to the database via S3. As such, the the copy() and copy_s3() methods require S3 credentials and write access on an S3 Bucket, which will be used for storing data en route to Redshift. See the API documentation for more information about AWS Redshift authorization.

Whitelisting

Remember to ensure that the IP address from which you are connecting has been whitelisted.

Quickstart

Redshift API credentials can either be passed as environmental variables (REDSHIFT_USERNAME, REDSHIFT_PASSWORD, REDSHIFT_HOST, REDSHIFT_DB, and REDSHIFT_PORT) or as keyword arguments. Methods that use COPY require an access key ID and a secret access key, which can also be passed as environmental variables (aws_access_key_id and aws_secret_access_key) or keyword arguments.

from parsons import Redshift

# Pass credentials as environmental variables
rs = Redshift()

# Pass credentials as keyword arguments
rs = Redshift(username='my_username', password='my_password', host='my_host',
              db='my_db', port='5439')

# Query the Database
table = rs.query('select * from tmc_scratch.test_data')

# Copy a Parsons Table to the Database
table = rs.copy(tbl, 'tmc_scratch.test_table', if_exists='drop')

All of the standard COPY options can be passed as kwargs. See the copy() method for all options.

Core API

Table and View API

Table and view utilities are a series of helper methods, all built off of commonly used SQL queries run against the Redshift database.

class parsons.databases.redshift.redshift.RedshiftTableUtilities[source]
table_exists(table_name, view=True)[source]

Check if a table or view exists in the database.

Args:
table_name: str

The table name and schema (e.g. myschema.mytable).

view: boolean

Check to see if a view exists by the same name

Returns:
boolean

True if the table exists and False if it does not.

get_row_count(table_name)[source]

Return the row count of a table.

SQL Code

SELECT COUNT(*) FROM myschema.mytable
Args:
table_name: str

The schema and name (e.g. myschema.mytable) of the table.

Returns:

int

rename_table(table_name, new_table_name)[source]

Rename an existing table.

Note

You cannot move schemas when renaming a table. Instead, utilize the table_duplicate(). method.

Args:
table_name: str

Name of existing schema and table (e.g. myschema.oldtable)

new_table_name: str

New name for table with the schema omitted (e.g. newtable).

move_table(source_table, new_table, drop_source_table=False)[source]

Move an existing table in the database.It will inherit encoding, sortkey and distkey. Once run, the source table rows will be empty. This is more efficiant than running "create newtable as select * from oldtable".

For more information see: ALTER TABLE APPEND

Args:
source_table: str

Name of existing schema and table (e.g. my_schema.old_table)

new_table: str

New name of schema and table (e.g. my_schema.newtable)

drop_original: boolean

Drop the source table.

Returns:

None

populate_table_from_query(query, destination_table, if_exists='fail', distkey=None, sortkey=None)[source]

Populate a Redshift table with the results of a SQL query, creating the table if it doesn’t yet exist.

Args:
query: str

The SQL query

destination_table: str

Name of destination schema and table (e.g. mys_chema.new_table)

if_exists: str

If the table already exists, either fail, append, drop, or truncate the table.

distkey: str

The column to use as the distkey for the table.

sortkey: str

The column to use as the sortkey for the table.

duplicate_table(source_table, destination_table, where_clause='', if_exists='fail', drop_source_table=False)[source]

Create a copy of an existing table (or subset of rows) in a new table. It will inherit encoding, sortkey and distkey.

Args:
source_table: str

Name of existing schema and table (e.g. myschema.oldtable)

destination_table: str

Name of destination schema and table (e.g. myschema.newtable)

where_clause: str

An optional where clause (e.g. where org = 1).

if_exists: str

If the table already exists, either fail, append, drop, or truncate the table.

drop_source_table: boolean

Drop the source table

union_tables(new_table_name, tables, union_all=True, view=False)[source]

Union a series of table into a new table.

Args:
new_table_name: str

The new table and schema (e.g. myschema.newtable)

tables: list

A list of tables to union

union_all: boolean

If False will deduplicate rows. If True will include duplicate rows.

view: boolean

Create a view rather than a static table

Returns:

None

get_tables(schema=None, table_name=None)[source]

List the tables in a schema including metadata.

Args:
schema: str

Filter by a schema

table_name: str

Filter by a table name

Returns:
Parsons Table

See Parsons Table for output options.

get_table_stats(schema=None, table_name=None)[source]

List the tables statistics includes row count and size.

Warning

This method is only accessible by Redshift superusers.

Args:
schema: str

Filter by a schema

table_name: str

Filter by a table name

Returns:
Parsons Table

See Parsons Table for output options.

get_columns(schema, table_name)[source]

Gets the column names (and some other column info) for a table.

If you just need the column names, run get_columns_list() as it is faster.

for col in rs.get_columns('some_schema', 'some_table'):
    print(col)
Args:
schema: str

The schema name

table_name: str

The table name

Returns:

A dict mapping column name to a dict with extra info. The keys of the dict are ordered just like the columns in the table. The extra info is a dict with format

{
'data_type': str,
'max_length': int or None,
'max_precision': int or None,
'max_scale': int or None,
'is_nullable': bool
}
get_columns_list(schema, table_name)[source]

Gets the just the column names for a table.

Args:
schema: str

The schema name

table_name: str

The table name

Returns:

A list of column names.

get_views(schema=None, view=None)[source]

List views.

Args:
schema: str

Filter by a schema

view: str

Filter by a table name

Returns:
Parsons Table

See Parsons Table for output options.

get_queries()[source]

Return the Current queries running and queueing, along with resource consumption.

Warning

Must be a Redshift superuser to run this method.

Returns:
Parsons Table

See Parsons Table for output options.

get_max_value(table_name, value_column)[source]

Return the max value from a table.

Args:
table_name: str

Schema and table name

value_column: str

The column containing the values

get_object_type(object_name)[source]

Get object type.

One of view, table, index, sequence, or TOAST table.

Args:
object_name: str

The schema.obj for which to get the object type.

Returns:

str of the object type.

is_view(object_name)[source]

Return true if the object is a view.

Args:
object_name: str

The schema.obj to test if it’s a view.

Returns:

bool

is_table(object_name)[source]

Return true if the object is a table.

Args:
object_name: str

The schema.obj to test if it’s a table.

Returns:

bool

get_table_definition(table)[source]

Get the table definition (i.e. the create statement).

Args:
table: str

The schema.table for which to get the table definition.

Returns:

str

get_table_definitions(schema=None, table=None)[source]

Get the table definition (i.e. the create statement) for multiple tables.

This works similar to get_table_def except it runs a single query to get the ddl for multiple tables. It supports SQL wildcards for schema and table. Only returns the ddl for _tables_ that match schema and table if they exist.

Args:
schema: str

The schema to filter by.

table: str

The table to filter by.

Returns:

list of dicts with matching tables.

get_view_definition(view)[source]

Get the view definition (i.e. the create statement).

Args:
view: str

The schema.view for which to get the view definition.

Returns:

str

get_view_definitions(schema=None, view=None)[source]

Get the view definition (i.e. the create statement) for multiple views.

This works similar to get_view_def except it runs a single query to get the ddl for multiple views. It supports SQL wildcards for schema and view. Only returns the ddl for _views_ that match schema and view if they exist.

Args:
schema: str

The schema to filter by.

view: str

The view to filter by.

Returns:

list of dicts with matching views.

static split_full_table_name(full_table_name)[source]

Split a full table name into its schema and table. If a schema isn’t present, return public for the schema. Similarly, Redshift defaults to the public schema, when one isn’t provided.

Eg: (schema, table) = Redshift.split_full_table_name("some_schema.some_table")

Args:
full_table_name: str

The table name, as “schema.table”

Returns:
tuple

A tuple containing (schema, table)

static combine_schema_and_table_name(schema, table)[source]

Creates a full table name by combining a schema and table.

Args:
schema: str

The schema name

table: str

The table name

Returns:
str

The combined full table name

Schema API

Schema utilities are a series of helper methods, all built off of commonly used SQL queries run against the Redshift database.

class parsons.databases.redshift.redshift.RedshiftSchema[source]
create_schema_with_permissions(schema, group=None)[source]

Creates a Redshift schema (if it doesn’t already exist), and grants usage permissions to a Redshift group (if specified).

Args:
schema: str

The schema name

group: str

The Redshift group name

type: str

The type of permissions to grant. Supports select, all, etc. (For full list, see the Redshift GRANT docs)

grant_schema_permissions(schema, group, permissions_type='select')[source]

Grants a Redshift group permissions to all tables within an existing schema.

Args:
schema: str

The schema name

group: str

The Redshift group name

type: str

The type of permissions to grant. Supports select, all, etc. (For full list, see the Redshift GRANT docs)