Amazon Web Services

Parsons provides utility functions and/or connectors for three different AWS services.

See the documentation for each service for more details.

Lambda

Overview

Parsons’ distribute_task function allows you to distribute process rows of a table across multiple AWS Lambda invocations.

If you are running the processing of a table inside AWS Lambda, then you are limited by how many rows can be processed within the Lambda’s time limit (at time-of-writing, maximum 15min).

Based on experience and some napkin math, with the same data that would allow 1000 rows to be processed inside a single AWS Lambda instance, this method allows 10 MILLION rows to be processed.

Rather than converting the table to SQS or other options, the fastest way is to upload the table to S3, and then invoke multiple Lambda sub-invocations, each of which can be sent a byte-range of the data in the S3 CSV file for which to process.

Using this method requires some setup. You have three tasks:

  1. Define the function to process rows, the first argument, must take your table’s data (though only a subset of rows will be passed) (e.g. def task_for_distribution(table, **kwargs):)

  2. Where you would have run task_for_distribution(my_table, **kwargs) instead call distribute_task(my_table, task_for_distribution, func_kwargs=kwargs) (either setting env var S3_TEMP_BUCKET or passing a bucket= parameter)

  3. Setup your Lambda handler to include parsons.aws.event_command() (or run and deploy your lambda with Zappa)

To test locally, include the argument storage="local", which will test the distribute_task function, but run the task sequentially and in local memory.

QuickStart

A minimalistic example Lambda handler might look something like this:

from parsons.aws import event_command, distribute_task

def process_table(table, foo, bar=None):
    for row in table:
        do_sloooooow_thing(row, foo, bar)

def handler(event, context):
    ## ADD THESE TWO LINES TO TOP OF HANDLER:
    if event_command(event, context):
        return
    table = FakeDatasource.load_to_table(username='123', password='abc')
    # table is so big that running
    #   process_table(table, foo=789, bar='baz') would timeout
    # so instead we:
    distribute_task(table, process_table,
                    bucket='my-temp-s3-bucket',
                    func_kwargs={'foo': 789, 'bar': 'baz'})

API

parsons.aws.distribute_task(table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage='s3', use_s3_env_token=True)[source]

Distribute processing rows in a table across multiple AWS Lambda invocations.

Args:
table: Parsons Table

Table of data you wish to distribute processing across Lambda invocations of func_to_run argument.

func_to_run: function

The function you want to run whose first argument will be a subset of table

bucket: str

The bucket name to use for s3 upload to process the whole table Not required if you set environment variable S3_TEMP_BUCKET

func_kwargs: dict

If the function has other arguments to pass along with table then provide them as a dict here. They must all be JSON-able.

func_class: class

If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed ActionKit.bulk_upload_table, then you would pass ActionKit here.

func_class_kwargs: dict

If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed ActionKit.bulk_upload_table as the function, then you would pass {‘domain’: …, ‘username’: … etc} here. This must all be JSON-able data.

catch: bool

Lambda will retry running an event three times if there’s an exception – if you want to prevent this, set catch=True and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string “Distribute Error” This might be important if row-actions are not idempotent and your own function might fail causing repeats.

group_count: int

Set this to how many rows to process with each Lambda invocation (Default: 100)

storage: str

Debugging option: Defaults to “s3”. To test distribution locally without s3, set to “local”.

use_s3_env_token: str

If storage is set to “s3”, sets the use_env_token parameter on the S3 storage.

Returns:

Debug information – do not rely on the output, as it will change depending on how this method is invoked.

parsons.aws.event_command(event, context)[source]

Minimal shim to add to the top lambda handler function to enable distributed tasks

The rest of this library is compatible with zappa.async library. If you have deployed your app with Zappa, then you do NOT need to add this shim.

S3

Overview

The S3 class allows interaction with Amazon Web Service’s object storage service to store and access data objects. It is a wrapper around the AWS SDK boto3. It provides methods to upload and download files from S3 as well as manipulate buckets.

Note

Authentication

Access to S3 is controlled through AWS Identity and Access Management (IAM) users in the AWS Managerment Console . Users can be granted granular access to AWS resources, including S3. IAM users are provisioned keys, which are required to access the S3 class.

QuickStart

S3 credentials can be passed as environmental variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY), stored in an AWS CLI file ~/.aws/credentials, or passed as keyword arguments.

from parsons import S3

# First approach: Pass API credentials via environmental variables or an AWS CLI file
s3 = S3()

# Second approach: Pass API credentials as arguments
s3 = S3(aws_access_key_id='MY_KEY', aws_secret_access_key='MY_SECRET')

# Put an arbitrary file in an S3 bucket
with open('winning_formula.csv') as w:
    s3.put_file('my_bucket', 'winning.csv, w)

# Put a Parsons Table as a CSV using convenience method.
tbl = Table.from_csv('winning_formula.csv')
tbl.to_s3_csv('my_bucket', 'winning.csv')

# Download a csv file and convert to a table
f = s3.get_file('my_bucket', 'my_dir/my_file.csv')
tbl = Table(f)

# List buckets that you have access to
s3.list_buckets()

# List the keys in a bucket
s3.list_keys('my_bucket')

API

class parsons.S3(aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, use_env_token=True)[source]

Instantiate the S3 class.

Args:
aws_access_key_id: str

The AWS access key id. Not required if the AWS_ACCESS_KEY_ID env variable is set.

aws_secret_access_key: str

The AWS secret access key. Not required if the AWS_SECRET_ACCESS_KEY env variable is set.

aws_session_token: str

The AWS session token. Optional. Can also be stored in the AWS_SESSION_TOKEN env variable. Used for accessing S3 with temporary credentials.

use_env_token: boolean

Controls use of the AWS_SESSION_TOKEN environment variable. Defaults to True. Set to False in order to ignore the AWS_SESSION_TOKEN environment variable even if the aws_session_token argument was not passed in.

Returns:

S3 class.

s3

Boto3 API Session Resource object. Use for more advanced boto3 features.

client

Boto3 API Session client object. Use for more advanced boto3 features.

list_buckets()[source]

List all buckets to which you have access.

Returns:

list

bucket_exists(bucket)[source]

Determine if a bucket exists and you have access to it.

Args:
bucket: str

The bucket name

Returns:
boolean

True if the bucket exists and False if not.

list_keys(bucket, prefix=None, suffix=None, regex=None, date_modified_before=None, date_modified_after=None, **kwargs)[source]

List the keys in a bucket, along with extra info about each one.

Args:
bucket: str

The bucket name

prefix: str

Limits the response to keys that begin with the specified prefix.

suffix: str

Limits the response to keys that end with specified suffix

regex: str

Limits the reponse to keys that match a regex pattern

date_modified_before: datetime.datetime

Limits the response to keys with date modified before

date_modified_after: datetime.datetime

Limits the response to keys with date modified after

kwargs:

Additional arguments for the S3 API call. See AWS ListObjectsV2 documentation for more info.

Returns:
dict

Dict mapping the keys to info about each key. The info includes ‘LastModified’, ‘Size’, and ‘Owner’.

key_exists(bucket, key)[source]

Determine if a key exists in a bucket.

Args:
bucket: str

The bucket name

key: str

The object key

Returns:
boolean

True if key exists and False if not.

create_bucket(bucket)[source]

Create an s3 bucket.

Warning

S3 has a limit on the number of buckets you can create in an AWS account, and that limit is fairly low (typically 100). If you are creating buckets frequently, you may be mis-using S3, and should consider using the same bucket for multiple tasks. There is no limit on the number of objects in a bucket. See AWS bucket restrictions for more info.

Warning

S3 bucket names are globally unique. So when creating a new bucket, the name can’t collide with any existing bucket names. If the provided name does collide, you’ll see errors like IllegalLocationConstraintException or BucketAlreadyExists.

Args:
bucket: str

The name of the bucket to create

Returns:

None

put_file(bucket, key, local_path, acl='bucket-owner-full-control', **kwargs)[source]

Uploads an object to an S3 bucket

Args:
bucket: str

The bucket name

key: str

The object key

local_path: str

The local path of the file to upload

acl: str

The S3 permissions on the file

kwargs:

Additional arguments for the S3 API call. See AWS Put Object documentation for more info.

remove_file(bucket, key)[source]

Deletes an object from an S3 bucket

Args:
bucket: str

The bucket name

key: str

The object key

Returns:

None

get_file(bucket, key, local_path=None, **kwargs)[source]

Download an object from S3 to a local file

Args:
local_path: str

The local path where the file will be downloaded. If not specified, a temporary file will be created and returned, and that file will be removed automatically when the script is done running.

bucket: str

The bucket name

key: str

The object key

kwargs:

Additional arguments for the S3 API call. See AWS download_file documentation for more info.

Returns:
str

The path of the new file

get_url(bucket, key, expires_in=3600)[source]

Generates a presigned url for an s3 object.

Args:
bucket: str

The bucket name

key: str

The object name

expires_in: int

The time, in seconds, until the url expires

Returns:
Url:

A link to download the object

transfer_bucket(origin_bucket, origin_key, destination_bucket, destination_key=None, suffix=None, regex=None, date_modified_before=None, date_modified_after=None, public_read=False, remove_original=False, **kwargs)[source]

Transfer files between s3 buckets

Args:
origin_bucket: str

The origin bucket

origin_key: str

The origin file or prefix

destination_bucket: str

The destination bucket

destination_key: str

If None then will retain the origin key. If set to prefix will move all to new prefix

suffix: str

Limits the response to keys that end with specified suffix

regex: str

Limits the reponse to keys that match a regex pattern

date_modified_before: datetime.datetime

Limits the response to keys with date modified before

date_modified_after: datetime.datetime

Limits the response to keys with date modified after

public_read: bool

If the keys should be set to public-read

remove_original: bool

If the original keys should be removed after transfer

kwargs:

Additional arguments for the S3 API call. See AWS download_file docs for more info.

Returns:

None

get_buckets_with_subname(bucket_subname)[source]

Grabs a type of bucket based on naming convention.

Args:
subname: str

This will most commonly be a ‘vendor’

Returns:
list

list of buckets

Temporary Credentials

The S3 API supports creating temporary credentials for one-off operations, such as pushing a file to a particular key in a particular bucket. For example, the Mapbox API allows you to request temporary credentials that grant you access to a bucket where you can upload map data. When S3 returns a set of temporary credentials it also returns a session token that needs to be included with the standard credentials for them to be accepted. The S3 class can be passed a session token as an environmental variable (AWS_SESSION_TOKEN) or as a keyword argument.

from parsons import S3

# First approach: Pass session token via AWS_SESSION_TOKEN environmental variable
s3 = S3()

# Second approach: Pass session token as an argument
creds = request_temporary_credentials()
s3 = S3(aws_access_key_id=creds['id'], aws_secret_access_key=creds['key'],
        aws_session_token=creds['token'])

Redshift

Overview

The Redshift class allows you to interact with an Amazon Redshift relational database. The connector utilizes the psycopg2 Python package under the hood. The core methods focus on input, output and querying of the database.

In addition to the core API integration provided by the Redshift class, Parsons also includes utility functions for managing schemas and tables. See Table and View API and Schema API for more information.

Note

S3 Credentials

Redshift only allows data to be copied to the database via S3. As such, the the copy() and copy_s3() methods require S3 credentials and write access on an S3 Bucket, which will be used for storing data en route to Redshift. See the API documentation for more information about AWS Redshift authorization.

Whitelisting

Remember to ensure that the IP address from which you are connecting has been whitelisted.

Quickstart

Redshift API credentials can either be passed as environmental variables (REDSHIFT_USERNAME, REDSHIFT_PASSWORD, REDSHIFT_HOST, REDSHIFT_DB, and REDSHIFT_PORT) or as keyword arguments. Methods that use COPY require an access key ID and a secret access key, which can also be passed as environmental variables (aws_access_key_id and aws_secret_access_key) or keyword arguments.

from parsons import Redshift

# Pass credentials as environmental variables
rs = Redshift()

# Pass credentials as keyword arguments
rs = Redshift(username='my_username', password='my_password', host='my_host',
              db='my_db', port='5439')

# Query the Database
table = rs.query('select * from tmc_scratch.test_data')

# Copy a Parsons Table to the Database
table = rs.copy(tbl, 'tmc_scratch.test_table', if_exists='drop')

All of the standard COPY options can be passed as kwargs. See the copy() method for all options.

Core API

class parsons.Redshift(username=None, password=None, host=None, db=None, port=None, timeout=10, s3_temp_bucket=None, aws_access_key_id=None, aws_secret_access_key=None, iam_role=None, use_env_token=True)[source]

A Redshift class to connect to database.

Args:
username: str

Required if env variable REDSHIFT_USERNAME not populated

password: str

Required if env variable REDSHIFT_PASSWORD not populated

host: str

Required if env variable REDSHIFT_HOST not populated

db: str

Required if env variable REDSHIFT_DB not populated

port: int

Required if env variable REDSHIFT_PORT not populated. Port 5439 is typical.

timeout: int

Seconds to timeout if connection not established

s3_temp_bucket: str

Name of the S3 bucket that will be used for storing data during bulk transfers. Required if you intend to perform bulk data transfers (eg. the copy_s3 method), and env variable S3_TEMP_BUCKET is not populated.

aws_access_key_id: str

The default AWS access key id for copying data from S3 into Redshift when running copy/upsert/etc methods. This will default to environment variable AWS_ACCESS_KEY_ID.

aws_secret_access_key: str

The default AWS secret access key for copying data from S3 into Redshift when running copy/upsert/etc methods. This will default to environment variable AWS_SECRET_ACCESS_KEY.

iam_role: str

AWS IAM Role ARN string – an optional, different way for credentials to be provided in the Redshift copy command that does not require an access key.

use_env_token: bool

Controls use of the AWS_SESSION_TOKEN environment variable for S3. Defaults to True. Set to False in order to ignore the AWS_SESSION_TOKEN environment variable even if the aws_session_token argument was not passed in.

parsons.Redshift.connection(self)

Generate a Redshift connection. The connection is set up as a python “context manager”, so it will be closed automatically (and all queries committed) when the connection goes out of scope.

When using the connection, make sure to put it in a with block (necessary for any context manager): with rs.connection() as conn:

Returns:

Psycopg2 connection object

parsons.Redshift.query(self, sql: str, parameters: list | None = None) Table | None

Execute a query against the Redshift database. Will return None if the query returns zero rows.

To include python variables in your query, it is recommended to pass them as parameters, following the psycopg style. Using the parameters argument ensures that values are escaped properly, and avoids SQL injection attacks.

Parameter Examples

# Note that the name contains a quote, which could break your query if not escaped
# properly.
name = "Beatrice O'Brady"
sql = "SELECT * FROM my_table WHERE name = %s"
rs.query(sql, parameters=[name])
names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"]
placeholders = ', '.join('%s' for item in names)
sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})"
rs.query(sql, parameters=names)
Args:
sql: str

A valid SQL statement

parameters: list

A list of python variables to be converted into SQL values in your query

Returns:
Parsons Table

See Parsons Table for output options.

parsons.Redshift.query_with_connection(self, sql, connection, parameters=None, commit=True)

Execute a query against the Redshift database, with an existing connection. Useful for batching queries together. Will return None if the query returns zero rows.

Args:
sql: str

A valid SQL statement

connection: obj

A connection object obtained from redshift.connection()

parameters: list

A list of python variables to be converted into SQL values in your query

commit: boolean

Whether to commit the transaction immediately. If False the transaction will be committed when the connection goes out of scope and is closed (or you can commit manually with connection.commit()).

Returns:
Parsons Table

See Parsons Table for output options.

parsons.Redshift.copy(self, tbl: Table, table_name: str, if_exists: str = 'fail', max_errors: int = 0, distkey: str | None = None, sortkey: str | None = None, padding: float | None = None, statupdate: bool | None = None, compupdate: bool | None = None, acceptanydate: bool = True, emptyasnull: bool = True, blanksasnull: bool = True, nullas: str | None = None, acceptinvchars: bool = True, dateformat: str = 'auto', timeformat: str = 'auto', varchar_max: List[str] | None = None, truncatecolumns: bool = False, columntypes: dict | None = None, specifycols: bool | None = None, alter_table: bool = False, alter_table_cascade: bool = False, aws_access_key_id: str | None = None, aws_secret_access_key: str | None = None, iam_role: str | None = None, cleanup_s3_file: bool = True, template_table: str | None = None, temp_bucket_region: str | None = None, strict_length: bool = True, csv_encoding: str = 'utf-8')

Copy a Parsons Table to Redshift.

Args:
tbl: obj

A Parsons Table.

table_name: str

The destination table name (ex. my_schema.my_table).

if_exists: str

If the table already exists, either fail, append, drop or truncate the table.

max_errors: int

The maximum number of rows that can error and be skipped before the job fails.

distkey: str

The column name of the distkey

sortkey: str

The column name of the sortkey

padding: float

A percentage padding to add to varchar columns if creating a new table. This is helpful to add a buffer for future copies in which the data might be wider.

statupate: boolean

Governs automatic computation and refresh of optimizer statistics at the end of a successful COPY command. If True explicitly sets statupate to on, if False explicitly sets statupate to off. If None stats update only if the table is initially empty. Defaults to None. See Redshift docs for more details.

Note

If STATUPDATE is used, the current user must be either the table owner or a superuser.

compupdate: boolean

Controls whether compression encodings are automatically applied during a COPY. If True explicitly sets compupdate to on, if False explicitly sets compupdate to off. If None the COPY command only chooses compression if the table is initially empty. Defaults to None. See Redshift docs for more details.

acceptanydate: boolean

Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.

emptyasnull: boolean

Indicates that Amazon Redshift should load empty char and varchar fields as NULL.

blanksasnull: boolean

Loads blank varchar fields, which consist of only white space characters, as NULL.

nullas: str

Loads fields that match string as NULL

acceptinvchars: boolean

Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.

dateformat: str

Set the date format. Defaults to auto.

timeformat: str

Set the time format. Defaults to auto.

varchar_max: list

A list of columns in which to set the width of the varchar column to 65,535 characters.

truncatecolumns: boolean

If the table already exists, truncates data in columns to the appropriate number of characters so that it fits the column specification. Applies only to columns with a VARCHAR or CHAR data type, and rows 4 MB or less in size.

columntypes: dict

Optional map of column name to redshift column type, overriding the usual type inference. You only specify the columns you want to override, eg. columntypes={'phone': 'varchar(12)', 'age': 'int'}).

specifycols: boolean

Adds a column list to the Redshift COPY command, allowing for the source table in an append to have the columnns out of order, and to have fewer columns with any leftover target table columns filled in with the DEFAULT value.

This will fail if all of the source table’s columns do not match a column in the target table. This will also fail if the target table has an IDENTITY column and that column name is among the source table’s columns.

alter_table: boolean

Will check if the target table varchar widths are wide enough to copy in the table data. If not, will attempt to alter the table to make it wide enough. This will not work with tables that have dependent views. To drop them, set alter_table_cascade to True.

alter_table_cascade: boolean

Will drop dependent objects when attempting to alter the table. If alter_table is False, this will be ignored.

aws_access_key_id:

An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.

aws_secret_access_key:

An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.

iam_role: str

An AWS IAM Role ARN string; an alternative credential for the COPY command from Redshift to S3. The IAM role must have been assigned to the Redshift instance and have access to the S3 bucket.

cleanup_s3_file: boolean

The s3 upload is removed by default on cleanup. You can set to False for debugging.

template_table: str

Instead of specifying columns, columntypes, and/or inference, if there is a pre-existing table that has the same columns/types, then use the template_table table name as the schema for the new table. Unless you set specifycols=False explicitly, a template_table will set it to True

temp_bucket_region: str

The AWS region that the temp bucket (specified by the TEMP_S3_BUCKET environment variable) is located in. This should be provided if the Redshift cluster is located in a different region from the temp bucket.

strict_length: bool

Whether or not to tightly fit the length of the table columns to the length of the data in tbl; if padding is specified, this argument is ignored.

csv_ecoding: str

String encoding to use when writing the temporary CSV file that is uploaded to S3. Defaults to ‘utf-8’.

Returns
Parsons Table or None

See Parsons Table for output options.

parsons.Redshift.copy_s3(self, table_name, bucket, key, manifest=False, data_type='csv', csv_delimiter=',', compression=None, if_exists='fail', max_errors=0, distkey=None, sortkey=None, padding=None, varchar_max=None, statupdate=True, compupdate=True, ignoreheader=1, acceptanydate=True, dateformat='auto', timeformat='auto', emptyasnull=True, blanksasnull=True, nullas=None, acceptinvchars=True, truncatecolumns=False, columntypes=None, specifycols=None, aws_access_key_id=None, aws_secret_access_key=None, bucket_region=None, strict_length=True, template_table=None, encoding='utf-8', line_delimited=False)

Copy a file from s3 to Redshift.

Args:
table_name: str

The table name and schema (tmc.cool_table) to point the file.

bucket: str

The s3 bucket where the file or manifest is located.

key: str

The key of the file or manifest in the s3 bucket.

manifest: str

If using a manifest

data_type: str

The data type of the file. Only csv supported currently.

csv_delimiter: str

The delimiter of the csv. Only relevant if data_type is csv.

compression: str

If specified (gzip), will attempt to decompress the file.

if_exists: str

If the table already exists, either fail, append, drop or truncate the table.

max_errors: int

The maximum number of rows that can error and be skipped before the job fails.

distkey: str

The column name of the distkey

sortkey: str

The column name of the sortkey

padding: float

A percentage padding to add to varchar columns if creating a new table. This is helpful to add a buffer for future copies in which the data might be wider.

varchar_max: list

A list of columns in which to set the width of the varchar column to 65,535 characters.

statupate: boolean

Governs automatic computation and refresh of optimizer statistics at the end of a successful COPY command.

compupdate: boolean

Controls whether compression encodings are automatically applied during a COPY.

ignore_header: int

The number of header rows to skip. Ignored if data_type is json.

acceptanydate: boolean

Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.

emptyasnull: boolean

Indicates that Amazon Redshift should load empty char and varchar fields as NULL.

blanksasnull: boolean

Loads blank varchar fields, which consist of only white space characters, as NULL.

nullas: str

Loads fields that match string as NULL

acceptinvchars: boolean

Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.

dateformat: str

Set the date format. Defaults to auto.

timeformat: str

Set the time format. Defaults to auto.

truncatecolumns: boolean

If the table already exists, truncates data in columns to the appropriate number of characters so that it fits the column specification. Applies only to columns with a VARCHAR or CHAR data type, and rows 4 MB or less in size.

columntypes: dict

Optional map of column name to redshift column type, overriding the usual type inference. You only specify the columns you want to override, eg. columntypes={'phone': 'varchar(12)', 'age': 'int'}).

specifycols: boolean

Adds a column list to the Redshift COPY command, allowing for the source table in an append to have the columnns out of order, and to have fewer columns with any leftover target table columns filled in with the DEFAULT value.

This will fail if all of the source table’s columns do not match a column in the target table. This will also fail if the target table has an IDENTITY column and that column name is among the source table’s columns.

aws_access_key_id:

An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.

aws_secret_access_key:

An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.

bucket_region: str

The AWS region that the bucket is located in. This should be provided if the Redshift cluster is located in a different region from the temp bucket.

strict_length: bool

If the database table needs to be created, strict_length determines whether the created table’s column sizes will be sized to exactly fit the current data, or if their size will be rounded up to account for future values being larger then the current dataset. defaults to True; this argument is ignored if padding is specified

template_table: str

Instead of specifying columns, columntypes, and/or inference, if there is a pre-existing table that has the same columns/types, then use the template_table table name as the schema for the new table.

Returns
Parsons Table or None

See Parsons Table for output options.

parsons.Redshift.unload(self, sql, bucket, key_prefix, manifest=True, header=True, delimiter='|', compression='gzip', add_quotes=True, null_as=None, escape=True, allow_overwrite=True, parallel=True, max_file_size='6.2 GB', extension=None, aws_region=None, aws_access_key_id=None, aws_secret_access_key=None)

Unload Redshift data to S3 Bucket. This is a more efficient method than running a query to export data as it can export in parallel and directly into an S3 bucket. Consider using this for exports of 10MM or more rows.

sql: str

The SQL string to execute to generate the data to unload.

bucket: str

The destination S3 bucket

key_prefix: str

The prefix of the key names that will be written

manifest: boolean

Creates a manifest file that explicitly lists details for the data files that are created by the UNLOAD process.

header: boolean

Adds a header line containing column names at the top of each output file.

delimiter: str

Specificies the character used to separate fields. Defaults to ‘|’.

compression: str

One of gzip, bzip2 or None. Unloads data to one or more compressed files per slice. Each resulting file is appended with a .gz or .bz2 extension.

add_quotes: boolean

Places quotation marks around each unloaded data field, so that Amazon Redshift can unload data values that contain the delimiter itself.

null_as: str

Specifies a string that represents a null value in unload files. If this option is not specified, null values are unloaded as zero-length strings for delimited output.

escape: boolean

For CHAR and VARCHAR columns in delimited unload files, an escape character () is placed before every linefeed, carriage return, escape characters and delimiters.

allow_overwrite: boolean

If True, will overwrite existing files, including the manifest file. If False will fail.

parallel: boolean

By default, UNLOAD writes data in parallel to multiple files, according to the number of slices in the cluster. The default option is ON or TRUE. If PARALLEL is OFF or FALSE, UNLOAD writes to one or more data files serially, sorted absolutely according to the ORDER BY clause, if one is used.

max_file_size: str

The maximum size of files UNLOAD creates in Amazon S3. Specify a decimal value between 5 MB and 6.2 GB.

extension: str

This extension will be added to the end of file names loaded to S3

region: str

The AWS Region where the target Amazon S3 bucket is located. REGION is required for UNLOAD to an Amazon S3 bucket that is not in the same AWS Region as the Amazon Redshift cluster.

aws_access_key_id:

An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.

aws_secret_access_key:

An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.

parsons.Redshift.upsert(self, table_obj, target_table, primary_key, vacuum=True, distinct_check=True, cleanup_temp_table=True, alter_table=True, alter_table_cascade=False, from_s3=False, distkey=None, sortkey=None, **copy_args)

Preform an upsert on an existing table. An upsert is a function in which rows in a table are updated and inserted at the same time.

Args:
table_obj: obj

A Parsons table object

target_table: str

The schema and table name to upsert

primary_key: str or list

The primary key column(s) of the target table

vacuum: boolean

Re-sorts rows and reclaims space in the specified table. You must be a table owner or super user to effectively vacuum a table, however the method will not fail if you lack these priviledges.

distinct_check: boolean

Check if the primary key column is distinct. Raise error if not.

cleanup_temp_table: boolean

A temp table is dropped by default on cleanup. You can set to False for debugging.

alter_table: boolean

Set to False to avoid automatic varchar column resizing to accomodate new data

alter_table_cascade: boolean

Will drop dependent objects when attempting to alter the table. If alter_table is False, this will be ignored.

from_s3: boolean

Instead of specifying a table_obj (set the first argument to None), set this to True and include copy_s3() arguments to upsert a pre-existing s3 file into the target_table

distkey: str

The column name of the distkey. If not provided, will default to primary_key.

sortkey: str or list

The column name(s) of the sortkey. If not provided, will default to primary_key.

**copy_args: kwargs

See copy() for options.

parsons.Redshift.generate_manifest(self, buckets, aws_access_key_id=None, aws_secret_access_key=None, mandatory=True, prefix=None, manifest_bucket=None, manifest_key=None, path=None)

Given a list of S3 buckets, generate a manifest file (JSON format). A manifest file allows you to copy multiple files into a single table at once. Once the manifest is generated, you can pass it with the copy_s3() method.

AWS keys are not required if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environmental variables set.

Args:

buckets: list or str

A list of buckets or single bucket from which to generate manifest

aws_access_key_id: str

AWS access key id to access S3 bucket

aws_secret_access_key: str

AWS secret access key to access S3 bucket

mandatory: boolean

The mandatory flag indicates whether the Redshift COPY should terminate if the file does not exist.

prefix: str

Optional filter for key prefixes

manifest_bucket: str

Optional bucket to write manifest file.

manifest_key: str

Optional key name for S3 bucket to write file

Returns:

dict of manifest

parsons.Redshift.alter_table_column_type(self, table_name, column_name, data_type, varchar_width=None)

Alter a column type of an existing table.

table_name: str

The table name (ex. my_schema.my_table).

column_name: str

The target column name

data_type: str

A valid Redshift data type to alter the table to.

varchar_width:

The new width of the column if of type varchar.

Table and View API

Table and view utilities are a series of helper methods, all built off of commonly used SQL queries run against the Redshift database.

class parsons.databases.redshift.redshift.RedshiftTableUtilities[source]
table_exists(table_name: str, view: bool = True) bool[source]

Check if a table or view exists in the database.

Args:
table_name: str

The table name and schema (e.g. myschema.mytable).

view: boolean

Check to see if a view exists by the same name

Returns:
boolean

True if the table exists and False if it does not.

get_row_count(table_name)[source]

Return the row count of a table.

SQL Code

SELECT COUNT(*) FROM myschema.mytable
Args:
table_name: str

The schema and name (e.g. myschema.mytable) of the table.

Returns:

int

rename_table(table_name, new_table_name)[source]

Rename an existing table.

Note

You cannot move schemas when renaming a table. Instead, utilize the table_duplicate(). method.

Args:
table_name: str

Name of existing schema and table (e.g. myschema.oldtable)

new_table_name: str

New name for table with the schema omitted (e.g. newtable).

move_table(source_table, new_table, drop_source_table=False)[source]

Move an existing table in the database.It will inherit encoding, sortkey and distkey. Once run, the source table rows will be empty. This is more efficiant than running "create newtable as select * from oldtable".

For more information see: ALTER TABLE APPEND

Args:
source_table: str

Name of existing schema and table (e.g. my_schema.old_table)

new_table: str

New name of schema and table (e.g. my_schema.newtable)

drop_original: boolean

Drop the source table.

Returns:

None

populate_table_from_query(query, destination_table, if_exists='fail', distkey=None, sortkey=None)[source]

Populate a Redshift table with the results of a SQL query, creating the table if it doesn’t yet exist.

Args:
query: str

The SQL query

destination_table: str

Name of destination schema and table (e.g. mys_chema.new_table)

if_exists: str

If the table already exists, either fail, append, drop, or truncate the table.

distkey: str

The column to use as the distkey for the table.

sortkey: str

The column to use as the sortkey for the table.

duplicate_table(source_table, destination_table, where_clause='', if_exists='fail', drop_source_table=False)[source]

Create a copy of an existing table (or subset of rows) in a new table. It will inherit encoding, sortkey and distkey.

Args:
source_table: str

Name of existing schema and table (e.g. myschema.oldtable)

destination_table: str

Name of destination schema and table (e.g. myschema.newtable)

where_clause: str

An optional where clause (e.g. where org = 1).

if_exists: str

If the table already exists, either fail, append, drop, or truncate the table.

drop_source_table: boolean

Drop the source table

union_tables(new_table_name, tables, union_all=True, view=False)[source]

Union a series of table into a new table.

Args:
new_table_name: str

The new table and schema (e.g. myschema.newtable)

tables: list

A list of tables to union

union_all: boolean

If False will deduplicate rows. If True will include duplicate rows.

view: boolean

Create a view rather than a static table

Returns:

None

get_tables(schema=None, table_name=None)[source]

List the tables in a schema including metadata.

Args:
schema: str

Filter by a schema

table_name: str

Filter by a table name

Returns:
Parsons Table

See Parsons Table for output options.

get_table_stats(schema=None, table_name=None)[source]

List the tables statistics includes row count and size.

Warning

This method is only accessible by Redshift superusers.

Args:
schema: str

Filter by a schema

table_name: str

Filter by a table name

Returns:
Parsons Table

See Parsons Table for output options.

get_columns(schema, table_name)[source]

Gets the column names (and some other column info) for a table.

If you just need the column names, run get_columns_list() as it is faster.

for col in rs.get_columns('some_schema', 'some_table'):
    print(col)
Args:
schema: str

The schema name

table_name: str

The table name

Returns:

A dict mapping column name to a dict with extra info. The keys of the dict are ordered just like the columns in the table. The extra info is a dict with format

{
'data_type': str,
'max_length': int or None,
'max_precision': int or None,
'max_scale': int or None,
'is_nullable': bool
}
get_columns_list(schema, table_name)[source]

Gets the just the column names for a table.

Args:
schema: str

The schema name

table_name: str

The table name

Returns:

A list of column names.

get_views(schema=None, view=None)[source]

List views.

Args:
schema: str

Filter by a schema

view: str

Filter by a table name

Returns:
Parsons Table

See Parsons Table for output options.

get_queries()[source]

Return the Current queries running and queueing, along with resource consumption.

Warning

Must be a Redshift superuser to run this method.

Returns:
Parsons Table

See Parsons Table for output options.

get_max_value(table_name, value_column)[source]

Return the max value from a table.

Args:
table_name: str

Schema and table name

value_column: str

The column containing the values

get_object_type(object_name)[source]

Get object type.

One of view, table, index, sequence, or TOAST table.

Args:
object_name: str

The schema.obj for which to get the object type.

Returns:

str of the object type.

is_view(object_name)[source]

Return true if the object is a view.

Args:
object_name: str

The schema.obj to test if it’s a view.

Returns:

bool

is_table(object_name)[source]

Return true if the object is a table.

Args:
object_name: str

The schema.obj to test if it’s a table.

Returns:

bool

get_table_definition(table)[source]

Get the table definition (i.e. the create statement).

Args:
table: str

The schema.table for which to get the table definition.

Returns:

str

get_table_definitions(schema=None, table=None)[source]

Get the table definition (i.e. the create statement) for multiple tables.

This works similar to get_table_def except it runs a single query to get the ddl for multiple tables. It supports SQL wildcards for schema and table. Only returns the ddl for _tables_ that match schema and table if they exist.

Args:
schema: str

The schema to filter by.

table: str

The table to filter by.

Returns:

list of dicts with matching tables.

get_view_definition(view)[source]

Get the view definition (i.e. the create statement).

Args:
view: str

The schema.view for which to get the view definition.

Returns:

str

get_view_definitions(schema=None, view=None)[source]

Get the view definition (i.e. the create statement) for multiple views.

This works similar to get_view_def except it runs a single query to get the ddl for multiple views. It supports SQL wildcards for schema and view. Only returns the ddl for _views_ that match schema and view if they exist.

Args:
schema: str

The schema to filter by.

view: str

The view to filter by.

Returns:

list of dicts with matching views.

static split_full_table_name(full_table_name)[source]

Split a full table name into its schema and table. If a schema isn’t present, return public for the schema. Similarly, Redshift defaults to the public schema, when one isn’t provided.

Eg: (schema, table) = Redshift.split_full_table_name("some_schema.some_table")

Args:
full_table_name: str

The table name, as “schema.table”

Returns:
tuple

A tuple containing (schema, table)

static combine_schema_and_table_name(schema, table)[source]

Creates a full table name by combining a schema and table.

Args:
schema: str

The schema name

table: str

The table name

Returns:
str

The combined full table name

Schema API

Schema utilities are a series of helper methods, all built off of commonly used SQL queries run against the Redshift database.

class parsons.databases.redshift.redshift.RedshiftSchema[source]
create_schema_with_permissions(schema, group=None)[source]

Creates a Redshift schema (if it doesn’t already exist), and grants usage permissions to a Redshift group (if specified).

Args:
schema: str

The schema name

group: str

The Redshift group name

type: str

The type of permissions to grant. Supports select, all, etc. (For full list, see the Redshift GRANT docs)

grant_schema_permissions(schema, group, permissions_type='select')[source]

Grants a Redshift group permissions to all tables within an existing schema.

Args:
schema: str

The schema name

group: str

The Redshift group name

type: str

The type of permissions to grant. Supports select, all, etc. (For full list, see the Redshift GRANT docs)