Amazon Web Services

Parsons provides utility functions and/or connectors for three different AWS services.

See the documentation for each service for more details.

Lambda

Overview

Parsons’ distribute_task function allows you to distribute process rows of a table across multiple AWS Lambda invocations.

If you are running the processing of a table inside AWS Lambda, then you are limited by how many rows can be processed within the Lambda’s time limit (at time-of-writing, maximum 15min).

Based on experience and some napkin math, with the same data that would allow 1000 rows to be processed inside a single AWS Lambda instance, this method allows 10 MILLION rows to be processed.

Rather than converting the table to SQS or other options, the fastest way is to upload the table to S3, and then invoke multiple Lambda sub-invocations, each of which can be sent a byte-range of the data in the S3 CSV file for which to process.

Using this method requires some setup. You have three tasks:

  1. Define the function to process rows, the first argument, must take your table’s data (though only a subset of rows will be passed) (e.g. def task_for_distribution(table, **kwargs):)
  2. Where you would have run task_for_distribution(my_table, **kwargs) instead call distribute_task(my_table, task_for_distribution, func_kwargs=kwargs) (either setting env var S3_TEMP_BUCKET or passing a bucket= parameter)
  3. Setup your Lambda handler to include parsons.aws.event_command() (or run and deploy your lambda with Zappa)

To test locally, include the argument storage="local", which will test the distribute_task function, but run the task sequentially and in local memory.

QuickStart

A minimalistic example Lambda handler might look something like this:

from parsons.aws import event_command, distribute_task

def process_table(table, foo, bar=None):
    for row in table:
        do_sloooooow_thing(row, foo, bar)

def handler(event, context):
    ## ADD THESE TWO LINES TO TOP OF HANDLER:
    if event_command(event, context):
        return
    table = FakeDatasource.load_to_table(username='123', password='abc')
    # table is so big that running
    #   process_table(table, foo=789, bar='baz') would timeout
    # so instead we:
    distribute_task(table, process_table,
                    bucket='my-temp-s3-bucket',
                    func_kwargs={'foo': 789, 'bar': 'baz'})

API

parsons.aws.distribute_task(table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage='s3')[source]

Distribute processing rows in a table across multiple AWS Lambda invocations.

Args:
table: Parsons Table
Table of data you wish to distribute processing across Lambda invocations of func_to_run argument.
func_to_run: function
The function you want to run whose first argument will be a subset of table
bucket: str
The bucket name to use for s3 upload to process the whole table Not required if you set environment variable S3_TEMP_BUCKET
func_kwargs: dict
If the function has other arguments to pass along with table then provide them as a dict here. They must all be JSON-able.
func_class: class
If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed ActionKit.bulk_upload_table, then you would pass ActionKit here.
func_class_kwargs: dict
If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed ActionKit.bulk_upload_table as the function, then you would pass {‘domain’: …, ‘username’: … etc} here. This must all be JSON-able data.
catch: bool
Lambda will retry running an event three times if there’s an exception – if you want to prevent this, set catch=True and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string “Distribute Error” This might be important if row-actions are not idempotent and your own function might fail causing repeats.
group_count: int
Set this to how many rows to process with each Lambda invocation (Default: 100)
storage: str
Debugging option: Defaults to “s3”. To test distribution locally without s3, set to “local”.
Returns:
Debug information – do not rely on the output, as it will change depending on how this method is invoked.
parsons.aws.event_command(event, context)[source]

Minimal shim to add to the top lambda handler function to enable distributed tasks

The rest of this library is compatible with zappa.async library. If you have deployed your app with Zappa, then you do NOT need to add this shim.

S3

Overview

The S3 class allows interaction with Amazon Web Service’s object storage service to store and access data objects. It is a wrapper around the AWS SDK boto3. It provides methods to upload and download files from S3 as well as manipulate buckets.

Note

Authentication
Access to S3 is controlled through AWS Identity and Access Management (IAM) users in the AWS Managerment Console . Users can be granted granular access to AWS resources, including S3. IAM users are provisioned keys, which are required to access the S3 class.

QuickStart

S3 credentials can be passed as environmental variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY), stored in an AWS CLI file ~/.aws/credentials, or passed as keyword arguments.

from parsons import S3

# First approach: Pass API credentials via environmental variables or an AWS CLI file
s3 = S3()

# Second approach: Pass API credentials as arguments
s3 = S3(aws_access_key_id='MY_KEY', aws_secret_access_key='MY_SECRET')

# Put an arbitrary file in an S3 bucket
with open('winning_formula.csv') as w:
    s3.put_file('my_bucket', 'winning.csv, w)

# Put a Parsons Table as a CSV using convenience method.
tbl = Table.from_csv('winning_formula.csv')
tbl.to_s3_csv('my_bucket', 'winning.csv')

# Download a csv file and convert to a table
f = s3.get_file('my_bucket', 'my_dir/my_file.csv')
tbl = Table(f)

# List buckets that you have access to
s3.list_buckets()

# List the keys in a bucket
s3.list_keys('my_bucket')

API

class parsons.S3(aws_access_key_id=None, aws_secret_access_key=None)[source]

Instantiate the S3 class.

Args:
aws_access_key_id: str
The AWS access key id. Not required if the AWS_ACCESS_KEY_ID env variable is set.
aws_secret_access_key: str
The AWS secret access key. Not required if the AWS_SECRET_ACCESS_KEY env variable is set.
Returns:
S3 class.
s3 = None

Boto3 API Session Resource object. Use for more advanced boto3 features.

client = None

Boto3 API Session client object. Use for more advanced boto3 features.

list_buckets()[source]

List all buckets to which you have access.

Returns:
list
bucket_exists(bucket)[source]

Determine if a bucket exists and you have access to it.

Args:
bucket: str
The bucket name
Returns:
boolean
True if the bucket exists and False if not.
list_keys(bucket, prefix=None, suffix=None, regex=None, date_modified_before=None, date_modified_after=None, **kwargs)[source]

List the keys in a bucket, along with extra info about each one.

Args:
bucket: str
The bucket name
prefix: str
Limits the response to keys that begin with the specified prefix.
suffix: str
Limits the response to keys that end with specified suffix
regex: str
Limits the reponse to keys that match a regex pattern
date_modified_before: datetime.datetime
Limits the response to keys with date modified before
date_modified_after: datetime.datetime
Limits the response to keys with date modified after
kwargs:
Additional arguments for the S3 API call. See AWS ListObjectsV2 documentation for more info.
Returns:
dict
Dict mapping the keys to info about each key. The info includes ‘LastModified’, ‘Size’, and ‘Owner’.
key_exists(bucket, key)[source]

Determine if a key exists in a bucket.

Args:
bucket: str
The bucket name
key: str
The object key
Returns:
boolean
True if key exists and False if not.
create_bucket(bucket)[source]

Create an s3 bucket.

Warning

S3 has a limit on the number of buckets you can create in an AWS account, and that limit is fairly low (typically 100). If you are creating buckets frequently, you may be mis-using S3, and should consider using the same bucket for multiple tasks. There is no limit on the number of objects in a bucket. See AWS bucket restrictions for more info.

Warning

S3 bucket names are globally unique. So when creating a new bucket, the name can’t collide with any existing bucket names. If the provided name does collide, you’ll see errors like IllegalLocationConstraintException or BucketAlreadyExists.

Args:
bucket: str
The name of the bucket to create
Returns:
None
put_file(bucket, key, local_path, acl='bucket-owner-full-control', **kwargs)[source]

Uploads an object to an S3 bucket

Args:
bucket: str
The bucket name
key: str
The object key
local_path: str
The local path of the file to upload
acl: str
The S3 permissions on the file
kwargs:
Additional arguments for the S3 API call. See AWS Put Object documentation for more info.
remove_file(bucket, key)[source]

Deletes an object from an S3 bucket

Args:
bucket: str
The bucket name
key: str
The object key
Returns:
None
get_file(bucket, key, local_path=None, **kwargs)[source]

Download an object from S3 to a local file

Args:
local_path: str
The local path where the file will be downloaded. If not specified, a temporary file will be created and returned, and that file will be removed automatically when the script is done running.
bucket: str
The bucket name
key: str
The object key
kwargs:
Additional arguments for the S3 API call. See AWS download_file documentation for more info.
Returns:
str
The path of the new file
get_url(bucket, key, expires_in=3600)[source]

Generates a presigned url for an s3 object.

Args:
bucket: str
The bucket name
key: str
The object name
expires_in: int
The time, in seconds, until the url expires
Returns:
Url:
A link to download the object
transfer_bucket(origin_bucket, origin_key, destination_bucket, destination_key=None, suffix=None, regex=None, date_modified_before=None, date_modified_after=None, public_read=False, remove_original=False, **kwargs)[source]

Transfer files between s3 buckets

Args:
origin_bucket: str
The origin bucket
origin_key: str
The origin file or prefix
destination_bucket: str
The destination bucket
destination_key: str
If None then will retain the origin key. If set to prefix will move all to new prefix
suffix: str
Limits the response to keys that end with specified suffix
regex: str
Limits the reponse to keys that match a regex pattern
date_modified_before: datetime.datetime
Limits the response to keys with date modified before
date_modified_after: datetime.datetime
Limits the response to keys with date modified after
public_read: bool
If the keys should be set to public-read
remove_original: bool
If the original keys should be removed after transfer
kwargs:
Additional arguments for the S3 API call. See AWS download_file docs for more info.
Returns:
None

Redshift

Overview

The Redshift class allows you to interact with an Amazon Redshift relational database. The connector utilizes the psycopg2 Python package under the hood. The core methods focus on input, output and querying of the database.

In addition to the core API integration provided by the Redshift class, Parsons also includes utility functions for managing schemas and tables. See Table and View API and Schema API for more information.

Note

S3 Credentials
Redshift only allows data to be copied to the database via S3. As such, the the copy() and copy_s3() methods require S3 credentials and write access on an S3 Bucket, which will be used for storing data en route to Redshift. See the API documentation for more information about AWS Redshift authorization.
Whitelisting
Remember to ensure that the IP address from which you are connecting has been whitelisted.

Quickstart

Redshift API credentials can either be passed as environmental variables (REDSHIFT_USERNAME, REDSHIFT_PASSWORD, REDSHIFT_HOST, REDSHIFT_DB, and REDSHIFT_PORT) or as keyword arguments. Methods that use COPY require an access key ID and a secret access key, which can also be passed as environmental variables (aws_access_key_id and aws_secret_access_key) or keyword arguments.

from parsons import Redshift

# Pass credentials as environmental variables
rs = Redshift()

# Pass credentials as keyword arguments
rs = Redshift(username='my_username', password='my_password', host='my_host',
              db='my_db', port='5439')

# Query the Database
table = rs.query('select * from tmc_scratch.test_data')

# Copy a Parsons Table to the Database
table = rs.copy(tbl, 'tmc_scratch.test_table', if_exists='drop')

All of the standard COPY options can be passed as kwargs. See the copy() method for all options.

Core API

class parsons.Redshift(username=None, password=None, host=None, db=None, port=None, timeout=10, s3_temp_bucket=None, aws_access_key_id=None, aws_secret_access_key=None, iam_role=None)[source]

A Redshift class to connect to database.

Args:
username: str
Required if env variable REDSHIFT_USERNAME not populated
password: str
Required if env variable REDSHIFT_PASSWORD not populated
host: str
Required if env variable REDSHIFT_HOST not populated
db: str
Required if env variable REDSHIFT_DB not populated
port: int
Required if env variable REDSHIFT_PORT not populated. Port 5439 is typical.
timeout: int
Seconds to timeout if connection not established
s3_temp_bucket: str
Name of the S3 bucket that will be used for storing data during bulk transfers. Required if you intend to perform bulk data transfers (eg. the copy_s3 method), and env variable S3_TEMP_BUCKET is not populated.
aws_access_key_id: str
The default AWS access key id for copying data from S3 into Redshift when running copy/upsert/etc methods. This will default to environment variable AWS_ACCESS_KEY_ID.
aws_secret_access_key: str
The default AWS secret access key for copying data from S3 into Redshift when running copy/upsert/etc methods. This will default to environment variable AWS_SECRET_ACCESS_KEY.
iam_role: str
AWS IAM Role ARN string – an optional, different way for credentials to be provided in the Redshift copy command that does not require an access key.
parsons.Redshift.connection(self)

Generate a Redshift connection. The connection is set up as a python “context manager”, so it will be closed automatically (and all queries committed) when the connection goes out of scope.

When using the connection, make sure to put it in a with block (necessary for any context manager): with rs.connection() as conn:

Returns:
Psycopg2 connection object
parsons.Redshift.query(self, sql, parameters=None)

Execute a query against the Redshift database. Will return None if the query returns zero rows.

To include python variables in your query, it is recommended to pass them as parameters, following the psycopg style. Using the parameters argument ensures that values are escaped properly, and avoids SQL injection attacks.

Parameter Examples

# Note that the name contains a quote, which could break your query if not escaped
# properly.
name = "Beatrice O'Brady"
sql = "SELECT * FROM my_table WHERE name = %s"
rs.query(sql, parameters=[name])
names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"]
placeholders = ', '.join('%s' for item in names)
sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})"
rs.query(sql, parameters=names)
Args:
sql: str
A valid SQL statement
parameters: list
A list of python variables to be converted into SQL values in your query
Returns:
Parsons Table
See Parsons Table for output options.
parsons.Redshift.query_with_connection(self, sql, connection, parameters=None, commit=True)

Execute a query against the Redshift database, with an existing connection. Useful for batching queries together. Will return None if the query returns zero rows.

Args:
sql: str
A valid SQL statement
connection: obj
A connection object obtained from redshift.connection()
parameters: list
A list of python variables to be converted into SQL values in your query
commit: boolean
Whether to commit the transaction immediately. If False the transaction will be committed when the connection goes out of scope and is closed (or you can commit manually with connection.commit()).
Returns:
Parsons Table
See Parsons Table for output options.
parsons.Redshift.copy(self, tbl, table_name, if_exists='fail', max_errors=0, distkey=None, sortkey=None, padding=None, statupdate=False, compupdate=True, acceptanydate=True, emptyasnull=True, blanksasnull=True, nullas=None, acceptinvchars=True, dateformat='auto', timeformat='auto', varchar_max=None, truncatecolumns=False, columntypes=None, specifycols=None, alter_table=False, aws_access_key_id=None, aws_secret_access_key=None, iam_role=None, cleanup_s3_file=True, template_table=None, temp_bucket_region=None)

Copy a Parsons Table to Redshift.

Args:
tbl: obj
A Parsons Table.
table_name: str
The destination table name (ex. my_schema.my_table).
if_exists: str
If the table already exists, either fail, append, drop or truncate the table.
max_errors: int
The maximum number of rows that can error and be skipped before the job fails.
distkey: str
The column name of the distkey
sortkey: str
The column name of the sortkey
padding: float
A percentage padding to add to varchar columns if creating a new table. This is helpful to add a buffer for future copies in which the data might be wider.
varchar_max: list
A list of columns in which to set the width of the varchar column to 65,535 characters.
statupate: boolean
Governs automatic computation and refresh of optimizer statistics at the end of a successful COPY command.
compupdate: boolean
Controls whether compression encodings are automatically applied during a COPY.
acceptanydate: boolean
Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.
emptyasnull: boolean
Indicates that Amazon Redshift should load empty char and varchar fields as NULL.
blanksasnull: boolean
Loads blank varchar fields, which consist of only white space characters, as NULL.
nullas: str
Loads fields that match string as NULL
acceptinvchars: boolean
Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.
dateformat: str
Set the date format. Defaults to auto.
timeformat: str
Set the time format. Defaults to auto.
truncatecolumns: boolean
If the table already exists, truncates data in columns to the appropriate number of characters so that it fits the column specification. Applies only to columns with a VARCHAR or CHAR data type, and rows 4 MB or less in size.
columntypes: dict
Optional map of column name to redshift column type, overriding the usual type inference. You only specify the columns you want to override, eg. columntypes={'phone': 'varchar(12)', 'age': 'int'}).
specifycols: boolean

Adds a column list to the Redshift COPY command, allowing for the source table in an append to have the columnns out of order, and to have fewer columns with any leftover target table columns filled in with the DEFAULT value.

This will fail if all of the source table’s columns do not match a column in the target table. This will also fail if the target table has an IDENTITY column and that column name is among the source table’s columns.

alter_table: boolean
Will check if the target table varchar widths are wide enough to copy in the table data. If not, will attempt to alter the table to make it wide enough. This will not work with tables that have dependent views.
aws_access_key_id:
An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
aws_secret_access_key:
An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
iam_role: str
An AWS IAM Role ARN string; an alternative credential for the COPY command from Redshift to S3. The IAM role must have been assigned to the Redshift instance and have access to the S3 bucket.
cleanup_s3_file: boolean
The s3 upload is removed by default on cleanup. You can set to False for debugging.
template_table: str
Instead of specifying columns, columntypes, and/or inference, if there is a pre-existing table that has the same columns/types, then use the template_table table name as the schema for the new table. Unless you set specifycols=False explicitly, a template_table will set it to True
temp_bucket_region: str
The AWS region that the temp bucket (specified by the TEMP_S3_BUCKET environment variable) is located in. This should be provided if the Redshift cluster is located in a different region from the temp bucket.
Returns
Parsons Table or None
See Parsons Table for output options.
parsons.Redshift.copy_s3(self, table_name, bucket, key, manifest=False, data_type='csv', csv_delimiter=', ', compression=None, if_exists='fail', max_errors=0, distkey=None, sortkey=None, padding=None, varchar_max=None, statupdate=True, compupdate=True, ignoreheader=1, acceptanydate=True, dateformat='auto', timeformat='auto', emptyasnull=True, blanksasnull=True, nullas=None, acceptinvchars=True, truncatecolumns=False, columntypes=None, specifycols=None, aws_access_key_id=None, aws_secret_access_key=None, bucket_region=None)

Copy a file from s3 to Redshift.

Args:
table_name: str
The table name and schema (tmc.cool_table) to point the file.
bucket: str
The s3 bucket where the file or manifest is located.
key: str
The key of the file or manifest in the s3 bucket.
manifest: str
If using a manifest
data_type: str
The data type of the file. Only csv supported currently.
csv_delimiter: str
The delimiter of the csv. Only relevant if data_type is csv.
compression: str
If specified (gzip), will attempt to decompress the file.
if_exists: str
If the table already exists, either fail, append, drop or truncate the table.
max_errors: int
The maximum number of rows that can error and be skipped before the job fails.
distkey: str
The column name of the distkey
sortkey: str
The column name of the sortkey
padding: float
A percentage padding to add to varchar columns if creating a new table. This is helpful to add a buffer for future copies in which the data might be wider.
varchar_max: list
A list of columns in which to set the width of the varchar column to 65,535 characters.
statupate: boolean
Governs automatic computation and refresh of optimizer statistics at the end of a successful COPY command.
compupdate: boolean
Controls whether compression encodings are automatically applied during a COPY.
ignore_header: int
The number of header rows to skip. Ignored if data_type is json.
acceptanydate: boolean
Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.
emptyasnull: boolean
Indicates that Amazon Redshift should load empty char and varchar fields as NULL.
blanksasnull: boolean
Loads blank varchar fields, which consist of only white space characters, as NULL.
nullas: str
Loads fields that match string as NULL
acceptinvchars: boolean
Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.
dateformat: str
Set the date format. Defaults to auto.
timeformat: str
Set the time format. Defaults to auto.
truncatecolumns: boolean
If the table already exists, truncates data in columns to the appropriate number of characters so that it fits the column specification. Applies only to columns with a VARCHAR or CHAR data type, and rows 4 MB or less in size.
columntypes: dict
Optional map of column name to redshift column type, overriding the usual type inference. You only specify the columns you want to override, eg. columntypes={'phone': 'varchar(12)', 'age': 'int'}).
specifycols: boolean

Adds a column list to the Redshift COPY command, allowing for the source table in an append to have the columnns out of order, and to have fewer columns with any leftover target table columns filled in with the DEFAULT value.

This will fail if all of the source table’s columns do not match a column in the target table. This will also fail if the target table has an IDENTITY column and that column name is among the source table’s columns.

aws_access_key_id:
An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
aws_secret_access_key:
An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
bucket_region: str
The AWS region that the bucket is located in. This should be provided if the Redshift cluster is located in a different region from the temp bucket.
Returns
Parsons Table or None
See Parsons Table for output options.
parsons.Redshift.unload(self, sql, bucket, key_prefix, manifest=True, header=True, delimiter='|', compression='gzip', add_quotes=True, null_as=None, escape=True, allow_overwrite=True, parallel=True, max_file_size='6.2 GB', aws_region=None, aws_access_key_id=None, aws_secret_access_key=None)

Unload Redshift data to S3 Bucket. This is a more efficient method than running a query to export data as it can export in parallel and directly into an S3 bucket. Consider using this for exports of 10MM or more rows.

sql: str
The SQL string to execute to generate the data to unload.
buckey: str
The destination S3 bucket
key_prefix: str
The prefix of the key names that will be written
manifest: boolean
Creates a manifest file that explicitly lists details for the data files that are created by the UNLOAD process.
header: boolean
Adds a header line containing column names at the top of each output file.
delimiter: str
Specificies the character used to separate fields. Defaults to ‘|’.
compression: str
One of gzip, bzip2 or None. Unloads data to one or more compressed files per slice. Each resulting file is appended with a .gz or .bz2 extension.
add_quotes: boolean
Places quotation marks around each unloaded data field, so that Amazon Redshift can unload data values that contain the delimiter itself.
null_as: str
Specifies a string that represents a null value in unload files. If this option is not specified, null values are unloaded as zero-length strings for delimited output.
escape: boolean
For CHAR and VARCHAR columns in delimited unload files, an escape character () is placed before every linefeed, carriage return, escape characters and delimiters.
allow_overwrite: boolean
If True, will overwrite existing files, including the manifest file. If False will fail.
parallel: boolean
By default, UNLOAD writes data in parallel to multiple files, according to the number of slices in the cluster. The default option is ON or TRUE. If PARALLEL is OFF or FALSE, UNLOAD writes to one or more data files serially, sorted absolutely according to the ORDER BY clause, if one is used.
max_file_size: str
The maximum size of files UNLOAD creates in Amazon S3. Specify a decimal value between 5 MB and 6.2 GB.
region: str
The AWS Region where the target Amazon S3 bucket is located. REGION is required for UNLOAD to an Amazon S3 bucket that is not in the same AWS Region as the Amazon Redshift cluster.
aws_access_key_id:
An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
aws_secret_access_key:
An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
parsons.Redshift.upsert(self, table_obj, target_table, primary_key, vacuum=True, distinct_check=True, cleanup_temp_table=True, alter_table=True, **copy_args)

Preform an upsert on an existing table. An upsert is a function in which records in a table are updated and inserted at the same time. Unlike other SQL databases, it does not exist natively in Redshift.

Args:
table_obj: obj
A Parsons table object
target_table: str
The schema and table name to upsert
primary_key: str or list
The primary key column(s) of the target table
vacuum: boolean
Re-sorts rows and reclaims space in the specified table. You must be a table owner or super user to effectively vacuum a table, however the method will not fail if you lack these priviledges.
distinct_check: boolean
Check if the primary key column is distinct. Raise error if not.
cleanup_temp_table: boolean
A temp table is dropped by default on cleanup. You can set to False for debugging.
**copy_args: kwargs
See copy() for options.
parsons.Redshift.generate_manifest(self, buckets, aws_access_key_id=None, aws_secret_access_key=None, mandatory=True, prefix=None, manifest_bucket=None, manifest_key=None, path=None)

Given a list of S3 buckets, generate a manifest file (JSON format). A manifest file allows you to copy multiple files into a single table at once. Once the manifest is generated, you can pass it with the copy_s3() method.

AWS keys are not required if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environmental variables set.

Args:

buckets: list or str
A list of buckets or single bucket from which to generate manifest
aws_access_key_id: str
AWS access key id to access S3 bucket
aws_secret_access_key: str
AWS secret access key to access S3 bucket
mandatory: boolean
The mandatory flag indicates whether the Redshift COPY should terminate if the file does not exist.
prefix: str
Optional filter for key prefixes
manifest_bucket: str
Optional bucket to write manifest file.
manifest_key: str
Optional key name for S3 bucket to write file
Returns:
dict of manifest
parsons.Redshift.alter_table_column_type(self, table_name, column_name, data_type, varchar_width=None)

Alter a column type of an existing table.

table_name: str
The table name (ex. my_schema.my_table).
column_name: str
The target column name
data_type: str
A valid Redshift data type to alter the table to.
varchar_width:
The new width of the column if of type varchar.

Table and View API

Table and view utilities are a series of helper methods, all built off of commonly used SQL queries run against the Redshift database.

class parsons.databases.redshift.redshift.RedshiftTableUtilities[source]
table_exists(table_name, view=True)[source]

Check if a table or view exists in the database.

Args:
table_name: str
The table name and schema (e.g. myschema.mytable).
view: boolean
Check to see if a view exists by the same name
Returns:
boolean
True if the table exists and False if it does not.
get_row_count(table_name)[source]

Return the row count of a table.

SQL Code

SELECT COUNT(*) FROM myschema.mytable
Args:
table_name: str
The schema and name (e.g. myschema.mytable) of the table.
Returns:
int
rename_table(table_name, new_table_name)[source]

Rename an existing table.

Note

You cannot move schemas when renaming a table. Instead, utilize the table_duplicate(). method.

Args:
table_name: str
Name of existing schema and table (e.g. myschema.oldtable)
new_table_name: str
New name for table with the schema omitted (e.g. newtable).
move_table(source_table, new_table, drop_source_table=False)[source]

Move an existing table in the database.It will inherit encoding, sortkey and distkey. Once run, the source table rows will be empty. This is more efficiant than running "create newtable as select * from oldtable".

For more information see: ALTER TABLE APPEND

Args:
source_table: str
Name of existing schema and table (e.g. my_schema.old_table)
new_table: str
New name of schema and table (e.g. my_schema.newtable)
drop_original: boolean
Drop the source table.
Returns:
None
populate_table_from_query(query, destination_table, if_exists='fail', distkey=None, sortkey=None)[source]

Populate a Redshift table with the results of a SQL query, creating the table if it doesn’t yet exist.

Args:
query: str
The SQL query
destination_table: str
Name of destination schema and table (e.g. mys_chema.new_table)
if_exists: str
If the table already exists, either fail, append, drop, or truncate the table.
distkey: str
The column to use as the distkey for the table.
sortkey: str
The column to use as the sortkey for the table.
duplicate_table(source_table, destination_table, where_clause='', if_exists='fail', drop_source_table=False)[source]

Create a copy of an existing table (or subset of rows) in a new table. It will inherit encoding, sortkey and distkey.

Args:
source_table: str
Name of existing schema and table (e.g. myschema.oldtable)
destination_table: str
Name of destination schema and table (e.g. myschema.newtable)
where_clause: str
An optional where clause (e.g. where org = 1).
if_exists: str
If the table already exists, either fail, append, drop, or truncate the table.
drop_source_table: boolean
Drop the source table
union_tables(new_table_name, tables, union_all=True, view=False)[source]

Union a series of table into a new table.

Args:
new_table_name: str
The new table and schema (e.g. myschema.newtable)
tables: list
A list of tables to union
union_all: boolean
If False will deduplicate rows. If True will include duplicate rows.
view: boolean
Create a view rather than a static table
Returns:
None
get_tables(schema=None, table_name=None)[source]

List the tables in a schema including metadata.

Args:
schema: str
Filter by a schema
table_name: str
Filter by a table name
Returns:
Parsons Table
See Parsons Table for output options.
get_table_stats(schema=None, table_name=None)[source]

List the tables statistics includes row count and size.

Warning

This method is only accessible by Redshift superusers.

Args:
schema: str
Filter by a schema
table_name: str
Filter by a table name
Returns:
Parsons Table
See Parsons Table for output options.
get_columns(schema, table_name)[source]

Gets the column names (and some other column info) for a table.

If you just need the column names, run get_columns_list() as it is faster.

for col in rs.get_columns('some_schema', 'some_table'):
    print(col)
Args:
schema: str
The schema name
table_name: str
The table name
Returns:

A dict mapping column name to a dict with extra info. The keys of the dict are ordered just like the columns in the table. The extra info is a dict with format

{
'data_type': str,
'max_length': int or None,
'max_precision': int or None,
'max_scale': int or None,
'is_nullable': bool
}
get_columns_list(schema, table_name)[source]

Gets the just the column names for a table.

Args:
schema: str
The schema name
table_name: str
The table name
Returns:
A list of column names.
get_views(schema=None, view=None)[source]

List views.

Args:
schema: str
Filter by a schema
view: str
Filter by a table name
Returns:
Parsons Table
See Parsons Table for output options.
get_queries()[source]

Return the Current queries running and queueing, along with resource consumption.

Warning

Must be a Redshift superuser to run this method.

Returns:
Parsons Table
See Parsons Table for output options.
get_max_value(table_name, value_column)[source]

Return the max value from a table.

Args:
table_name: str
Schema and table name
value_column: str
The column containing the values
get_object_type(object_name)[source]

Get object type.

One of view, table, index, sequence, or TOAST table.

Args:
object_name: str
The schema.obj for which to get the object type.
Returns:
str of the object type.
is_view(object_name)[source]

Return true if the object is a view.

Args:
object_name: str
The schema.obj to test if it’s a view.
Returns:
bool
is_table(object_name)[source]

Return true if the object is a table.

Args:
object_name: str
The schema.obj to test if it’s a table.
Returns:
bool
get_table_definition(table)[source]

Get the table definition (i.e. the create statement).

Args:
table: str
The schema.table for which to get the table definition.
Returns:
str
get_table_definitions(schema=None, table=None)[source]

Get the table definition (i.e. the create statement) for multiple tables.

This works similar to get_table_def except it runs a single query to get the ddl for multiple tables. It supports SQL wildcards for schema and table. Only returns the ddl for _tables_ that match schema and table if they exist.

Args:
schema: str
The schema to filter by.
table: str
The table to filter by.
Returns:
list of dicts with matching tables.
get_view_definition(view)[source]

Get the view definition (i.e. the create statement).

Args:
view: str
The schema.view for which to get the view definition.
Returns:
str
get_view_definitions(schema=None, view=None)[source]

Get the view definition (i.e. the create statement) for multiple views.

This works similar to get_view_def except it runs a single query to get the ddl for multiple views. It supports SQL wildcards for schema and view. Only returns the ddl for _views_ that match schema and view if they exist.

Args:
schema: str
The schema to filter by.
view: str
The view to filter by.
Returns:
list of dicts with matching views.
static split_full_table_name(full_table_name)[source]

Split a full table name into its schema and table. If a schema isn’t present, return public for the schema. Similarly, Redshift defaults to the public schema, when one isn’t provided.

Eg: (schema, table) = Redshift.split_full_table_name("some_schema.some_table")

Args:
full_table_name: str
The table name, as “schema.table”
Returns:
tuple
A tuple containing (schema, table)
static combine_schema_and_table_name(schema, table)[source]

Creates a full table name by combining a schema and table.

Args:
schema: str
The schema name
table: str
The table name
Returns:
str
The combined full table name

Schema API

Schema utilities are a series of helper methods, all built off of commonly used SQL queries run against the Redshift database.

class parsons.databases.redshift.redshift.RedshiftSchema[source]
create_schema_with_permissions(schema, group=None)[source]

Creates a Redshift schema (if it doesn’t already exist), and grants usage permissions to a Redshift group (if specified).

Args:
schema: str
The schema name
group: str
The Redshift group name
type: str
The type of permissions to grant. Supports select, all, etc. (For full list, see the Redshift GRANT docs)
grant_schema_permissions(schema, group, permissions_type='select')[source]

Grants a Redshift group permissions to all tables within an existing schema.

Args:
schema: str
The schema name
group: str
The Redshift group name
type: str
The type of permissions to grant. Supports select, all, etc. (For full list, see the Redshift GRANT docs)