Amazon Web Services
Parsons provides utility functions and/or connectors for three different AWS services.
S3: AWS’s object storage service
Redshift: AWS’s data warehousing service, with two additional classes providing utility functions.
Table and View API: Methods for managing tables and views
Schema API: Methods for managing schema
See the documentation for each service for more details.
Lambda
Overview
Parsons’ distribute_task
function allows you to distribute process rows of a table across multiple
AWS Lambda invocations.
If you are running the processing of a table inside AWS Lambda, then you are limited by how many rows can be processed within the Lambda’s time limit (at time-of-writing, maximum 15min).
Based on experience and some napkin math, with the same data that would allow 1000 rows to be processed inside a single AWS Lambda instance, this method allows 10 MILLION rows to be processed.
Rather than converting the table to SQS or other options, the fastest way is to upload the table to S3, and then invoke multiple Lambda sub-invocations, each of which can be sent a byte-range of the data in the S3 CSV file for which to process.
Using this method requires some setup. You have three tasks:
Define the function to process rows, the first argument, must take your table’s data (though only a subset of rows will be passed) (e.g.
def task_for_distribution(table, **kwargs):
)Where you would have run
task_for_distribution(my_table, **kwargs)
instead calldistribute_task(my_table, task_for_distribution, func_kwargs=kwargs)
(either setting env var S3_TEMP_BUCKET or passing abucket=
parameter)Setup your Lambda handler to include
parsons.aws.event_command()
(or run and deploy your lambda with Zappa)
To test locally, include the argument storage="local"
, which will test the distribute_task
function, but run the task sequentially and in local memory.
QuickStart
A minimalistic example Lambda handler might look something like this:
from parsons.aws import event_command, distribute_task
def process_table(table, foo, bar=None):
for row in table:
do_sloooooow_thing(row, foo, bar)
def handler(event, context):
## ADD THESE TWO LINES TO TOP OF HANDLER:
if event_command(event, context):
return
table = FakeDatasource.load_to_table(username='123', password='abc')
# table is so big that running
# process_table(table, foo=789, bar='baz') would timeout
# so instead we:
distribute_task(table, process_table,
bucket='my-temp-s3-bucket',
func_kwargs={'foo': 789, 'bar': 'baz'})
API
- parsons.aws.distribute_task(table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage='s3', use_s3_env_token=True)[source]
Distribute processing rows in a table across multiple AWS Lambda invocations.
- Args:
- table: Parsons Table
Table of data you wish to distribute processing across Lambda invocations of func_to_run argument.
- func_to_run: function
The function you want to run whose first argument will be a subset of table
- bucket: str
The bucket name to use for s3 upload to process the whole table Not required if you set environment variable
S3_TEMP_BUCKET
- func_kwargs: dict
If the function has other arguments to pass along with table then provide them as a dict here. They must all be JSON-able.
- func_class: class
If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed ActionKit.bulk_upload_table, then you would pass ActionKit here.
- func_class_kwargs: dict
If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed ActionKit.bulk_upload_table as the function, then you would pass {‘domain’: …, ‘username’: … etc} here. This must all be JSON-able data.
- catch: bool
Lambda will retry running an event three times if there’s an exception – if you want to prevent this, set catch=True and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string “Distribute Error” This might be important if row-actions are not idempotent and your own function might fail causing repeats.
- group_count: int
Set this to how many rows to process with each Lambda invocation (Default: 100)
- storage: str
Debugging option: Defaults to “s3”. To test distribution locally without s3, set to “local”.
- use_s3_env_token: str
If storage is set to “s3”, sets the use_env_token parameter on the S3 storage.
- Returns:
Debug information – do not rely on the output, as it will change depending on how this method is invoked.
S3
Overview
The S3
class allows interaction with Amazon Web Service’s object storage service
to store and access data objects. It is a wrapper around the AWS SDK boto3.
It provides methods to upload and download files from S3 as well as manipulate buckets.
Note
- Authentication
Access to S3 is controlled through AWS Identity and Access Management (IAM) users in the AWS Managerment Console . Users can be granted granular access to AWS resources, including S3. IAM users are provisioned keys, which are required to access the S3 class.
QuickStart
S3 credentials can be passed as environmental variables (AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
),
stored in an AWS CLI file ~/.aws/credentials
, or passed as keyword arguments.
from parsons import S3
# First approach: Pass API credentials via environmental variables or an AWS CLI file
s3 = S3()
# Second approach: Pass API credentials as arguments
s3 = S3(aws_access_key_id='MY_KEY', aws_secret_access_key='MY_SECRET')
# Put an arbitrary file in an S3 bucket
with open('winning_formula.csv') as w:
s3.put_file('my_bucket', 'winning.csv, w)
# Put a Parsons Table as a CSV using convenience method.
tbl = Table.from_csv('winning_formula.csv')
tbl.to_s3_csv('my_bucket', 'winning.csv')
# Download a csv file and convert to a table
f = s3.get_file('my_bucket', 'my_dir/my_file.csv')
tbl = Table(f)
# List buckets that you have access to
s3.list_buckets()
# List the keys in a bucket
s3.list_keys('my_bucket')
API
- class parsons.S3(aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, use_env_token=True)[source]
Instantiate the S3 class.
- Args:
- aws_access_key_id: str
The AWS access key id. Not required if the
AWS_ACCESS_KEY_ID
env variable is set.- aws_secret_access_key: str
The AWS secret access key. Not required if the
AWS_SECRET_ACCESS_KEY
env variable is set.- aws_session_token: str
The AWS session token. Optional. Can also be stored in the
AWS_SESSION_TOKEN
env variable. Used for accessing S3 with temporary credentials.- use_env_token: boolean
Controls use of the
AWS_SESSION_TOKEN
environment variable. Defaults toTrue
. Set toFalse
in order to ignore theAWS_SESSION_TOKEN
environment variable even if theaws_session_token
argument was not passed in.
- Returns:
S3 class.
- s3
Boto3 API Session Resource object. Use for more advanced boto3 features.
- client
Boto3 API Session client object. Use for more advanced boto3 features.
- bucket_exists(bucket)[source]
Determine if a bucket exists and you have access to it.
- Args:
- bucket: str
The bucket name
- Returns:
- boolean
True
if the bucket exists andFalse
if not.
- list_keys(bucket, prefix=None, suffix=None, regex=None, date_modified_before=None, date_modified_after=None, **kwargs)[source]
List the keys in a bucket, along with extra info about each one.
- Args:
- bucket: str
The bucket name
- prefix: str
Limits the response to keys that begin with the specified prefix.
- suffix: str
Limits the response to keys that end with specified suffix
- regex: str
Limits the reponse to keys that match a regex pattern
- date_modified_before: datetime.datetime
Limits the response to keys with date modified before
- date_modified_after: datetime.datetime
Limits the response to keys with date modified after
- kwargs:
Additional arguments for the S3 API call. See AWS ListObjectsV2 documentation for more info.
- Returns:
- dict
Dict mapping the keys to info about each key. The info includes ‘LastModified’, ‘Size’, and ‘Owner’.
- key_exists(bucket, key)[source]
Determine if a key exists in a bucket.
- Args:
- bucket: str
The bucket name
- key: str
The object key
- Returns:
- boolean
True
if key exists andFalse
if not.
- create_bucket(bucket)[source]
Create an s3 bucket.
Warning
S3 has a limit on the number of buckets you can create in an AWS account, and that limit is fairly low (typically 100). If you are creating buckets frequently, you may be mis-using S3, and should consider using the same bucket for multiple tasks. There is no limit on the number of objects in a bucket. See AWS bucket restrictions for more info.
Warning
S3 bucket names are globally unique. So when creating a new bucket, the name can’t collide with any existing bucket names. If the provided name does collide, you’ll see errors like IllegalLocationConstraintException or BucketAlreadyExists.
- Args:
- bucket: str
The name of the bucket to create
- Returns:
None
- put_file(bucket, key, local_path, acl='bucket-owner-full-control', **kwargs)[source]
Uploads an object to an S3 bucket
- Args:
- bucket: str
The bucket name
- key: str
The object key
- local_path: str
The local path of the file to upload
- acl: str
The S3 permissions on the file
- kwargs:
Additional arguments for the S3 API call. See AWS Put Object documentation for more info.
- remove_file(bucket, key)[source]
Deletes an object from an S3 bucket
- Args:
- bucket: str
The bucket name
- key: str
The object key
- Returns:
None
- get_file(bucket, key, local_path=None, **kwargs)[source]
Download an object from S3 to a local file
- Args:
- local_path: str
The local path where the file will be downloaded. If not specified, a temporary file will be created and returned, and that file will be removed automatically when the script is done running.
- bucket: str
The bucket name
- key: str
The object key
- kwargs:
Additional arguments for the S3 API call. See AWS download_file documentation for more info.
- Returns:
- str
The path of the new file
- get_url(bucket, key, expires_in=3600)[source]
Generates a presigned url for an s3 object.
- Args:
- bucket: str
The bucket name
- key: str
The object name
- expires_in: int
The time, in seconds, until the url expires
- Returns:
- Url:
A link to download the object
- transfer_bucket(origin_bucket, origin_key, destination_bucket, destination_key=None, suffix=None, regex=None, date_modified_before=None, date_modified_after=None, public_read=False, remove_original=False, **kwargs)[source]
Transfer files between s3 buckets
- Args:
- origin_bucket: str
The origin bucket
- origin_key: str
The origin file or prefix
- destination_bucket: str
The destination bucket
- destination_key: str
If None then will retain the origin key. If set to prefix will move all to new prefix
- suffix: str
Limits the response to keys that end with specified suffix
- regex: str
Limits the reponse to keys that match a regex pattern
- date_modified_before: datetime.datetime
Limits the response to keys with date modified before
- date_modified_after: datetime.datetime
Limits the response to keys with date modified after
- public_read: bool
If the keys should be set to public-read
- remove_original: bool
If the original keys should be removed after transfer
- kwargs:
Additional arguments for the S3 API call. See AWS download_file docs for more info.
- Returns:
None
Temporary Credentials
The S3 API supports creating temporary credentials for one-off operations, such as pushing a file to a particular key in a particular bucket.
For example, the Mapbox API allows you to request temporary credentials that grant you access to a bucket where you can upload map data.
When S3 returns a set of temporary credentials it also returns a session token that needs to be included with the standard credentials for
them to be accepted. The S3
class can be passed a session token as an environmental variable (AWS_SESSION_TOKEN
) or as a keyword argument.
from parsons import S3
# First approach: Pass session token via AWS_SESSION_TOKEN environmental variable
s3 = S3()
# Second approach: Pass session token as an argument
creds = request_temporary_credentials()
s3 = S3(aws_access_key_id=creds['id'], aws_secret_access_key=creds['key'],
aws_session_token=creds['token'])
Redshift
Overview
The Redshift
class allows you to interact with an Amazon Redshift relational database.
The connector utilizes the psycopg2 Python package under the hood. The core methods
focus on input, output and querying of the database.
In addition to the core API integration provided by the Redshift
class, Parsons also includes utility functions for
managing schemas and tables. See Table and View API and Schema API for more information.
Note
- S3 Credentials
Redshift only allows data to be copied to the database via S3. As such, the the
copy()
andcopy_s3()
methods require S3 credentials and write access on an S3 Bucket, which will be used for storing data en route to Redshift. See the API documentation for more information about AWS Redshift authorization.- Whitelisting
Remember to ensure that the IP address from which you are connecting has been whitelisted.
Quickstart
Redshift API credentials can either be passed as environmental variables (REDSHIFT_USERNAME
, REDSHIFT_PASSWORD
,
REDSHIFT_HOST
, REDSHIFT_DB
, and REDSHIFT_PORT
) or as keyword arguments. Methods that use COPY require an
access key ID and a secret access key,
which can also be passed as environmental variables (aws_access_key_id
and aws_secret_access_key
)
or keyword arguments.
from parsons import Redshift
# Pass credentials as environmental variables
rs = Redshift()
# Pass credentials as keyword arguments
rs = Redshift(username='my_username', password='my_password', host='my_host',
db='my_db', port='5439')
# Query the Database
table = rs.query('select * from tmc_scratch.test_data')
# Copy a Parsons Table to the Database
table = rs.copy(tbl, 'tmc_scratch.test_table', if_exists='drop')
All of the standard COPY options can be passed as kwargs. See the copy()
method for all
options.
Core API
- class parsons.Redshift(username=None, password=None, host=None, db=None, port=None, timeout=10, s3_temp_bucket=None, aws_access_key_id=None, aws_secret_access_key=None, iam_role=None, use_env_token=True)[source]
A Redshift class to connect to database.
- Args:
- username: str
Required if env variable
REDSHIFT_USERNAME
not populated- password: str
Required if env variable
REDSHIFT_PASSWORD
not populated- host: str
Required if env variable
REDSHIFT_HOST
not populated- db: str
Required if env variable
REDSHIFT_DB
not populated- port: int
Required if env variable
REDSHIFT_PORT
not populated. Port 5439 is typical.- timeout: int
Seconds to timeout if connection not established
- s3_temp_bucket: str
Name of the S3 bucket that will be used for storing data during bulk transfers. Required if you intend to perform bulk data transfers (eg. the copy_s3 method), and env variable
S3_TEMP_BUCKET
is not populated.- aws_access_key_id: str
The default AWS access key id for copying data from S3 into Redshift when running copy/upsert/etc methods. This will default to environment variable AWS_ACCESS_KEY_ID.
- aws_secret_access_key: str
The default AWS secret access key for copying data from S3 into Redshift when running copy/upsert/etc methods. This will default to environment variable AWS_SECRET_ACCESS_KEY.
- iam_role: str
AWS IAM Role ARN string – an optional, different way for credentials to be provided in the Redshift copy command that does not require an access key.
- use_env_token: bool
Controls use of the
AWS_SESSION_TOKEN
environment variable for S3. Defaults toTrue
. Set toFalse
in order to ignore theAWS_SESSION_TOKEN
environment variable even if theaws_session_token
argument was not passed in.
- parsons.Redshift.connection(self)
Generate a Redshift connection. The connection is set up as a python “context manager”, so it will be closed automatically (and all queries committed) when the connection goes out of scope.
When using the connection, make sure to put it in a
with
block (necessary for any context manager):with rs.connection() as conn:
- Returns:
Psycopg2
connection
object
- parsons.Redshift.query(self, sql: str, parameters: Optional[list] = None) Optional[Table]
Execute a query against the Redshift database. Will return
None
if the query returns zero rows.To include python variables in your query, it is recommended to pass them as parameters, following the psycopg style. Using the
parameters
argument ensures that values are escaped properly, and avoids SQL injection attacks.Parameter Examples
# Note that the name contains a quote, which could break your query if not escaped # properly. name = "Beatrice O'Brady" sql = "SELECT * FROM my_table WHERE name = %s" rs.query(sql, parameters=[name])
names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"] placeholders = ', '.join('%s' for item in names) sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})" rs.query(sql, parameters=names)
- Args:
- sql: str
A valid SQL statement
- parameters: list
A list of python variables to be converted into SQL values in your query
- Returns:
- Parsons Table
See Parsons Table for output options.
- parsons.Redshift.query_with_connection(self, sql, connection, parameters=None, commit=True)
Execute a query against the Redshift database, with an existing connection. Useful for batching queries together. Will return
None
if the query returns zero rows.- Args:
- sql: str
A valid SQL statement
- connection: obj
A connection object obtained from
redshift.connection()
- parameters: list
A list of python variables to be converted into SQL values in your query
- commit: boolean
Whether to commit the transaction immediately. If
False
the transaction will be committed when the connection goes out of scope and is closed (or you can commit manually withconnection.commit()
).
- Returns:
- Parsons Table
See Parsons Table for output options.
- parsons.Redshift.copy(self, tbl: Table, table_name: str, if_exists: str = 'fail', max_errors: int = 0, distkey: Optional[str] = None, sortkey: Optional[str] = None, padding: Optional[float] = None, statupdate: Optional[bool] = None, compupdate: Optional[bool] = None, acceptanydate: bool = True, emptyasnull: bool = True, blanksasnull: bool = True, nullas: Optional[str] = None, acceptinvchars: bool = True, dateformat: str = 'auto', timeformat: str = 'auto', varchar_max: Optional[List[str]] = None, truncatecolumns: bool = False, columntypes: Optional[dict] = None, specifycols: Optional[bool] = None, alter_table: bool = False, alter_table_cascade: bool = False, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, iam_role: Optional[str] = None, cleanup_s3_file: bool = True, template_table: Optional[str] = None, temp_bucket_region: Optional[str] = None, strict_length: bool = True, csv_encoding: str = 'utf-8')
Copy a Parsons Table to Redshift.
- Args:
- tbl: obj
A Parsons Table.
- table_name: str
The destination table name (ex.
my_schema.my_table
).- if_exists: str
If the table already exists, either
fail
,append
,drop
ortruncate
the table.- max_errors: int
The maximum number of rows that can error and be skipped before the job fails.
- distkey: str
The column name of the distkey
- sortkey: str
The column name of the sortkey
- padding: float
A percentage padding to add to varchar columns if creating a new table. This is helpful to add a buffer for future copies in which the data might be wider.
- statupate: boolean
Governs automatic computation and refresh of optimizer statistics at the end of a successful COPY command. If
True
explicitly setsstatupate
to on, ifFalse
explicitly setsstatupate
to off. IfNone
stats update only if the table is initially empty. Defaults toNone
. See Redshift docs for more details.Note
If STATUPDATE is used, the current user must be either the table owner or a superuser.
- compupdate: boolean
Controls whether compression encodings are automatically applied during a COPY. If
True
explicitly setscompupdate
to on, ifFalse
explicitly setscompupdate
to off. IfNone
the COPY command only chooses compression if the table is initially empty. Defaults toNone
. See Redshift docs for more details.- acceptanydate: boolean
Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.
- emptyasnull: boolean
Indicates that Amazon Redshift should load empty char and varchar fields as
NULL
.- blanksasnull: boolean
Loads blank varchar fields, which consist of only white space characters, as
NULL
.- nullas: str
Loads fields that match string as NULL
- acceptinvchars: boolean
Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.
- dateformat: str
Set the date format. Defaults to
auto
.- timeformat: str
Set the time format. Defaults to
auto
.- varchar_max: list
A list of columns in which to set the width of the varchar column to 65,535 characters.
- truncatecolumns: boolean
If the table already exists, truncates data in columns to the appropriate number of characters so that it fits the column specification. Applies only to columns with a VARCHAR or CHAR data type, and rows 4 MB or less in size.
- columntypes: dict
Optional map of column name to redshift column type, overriding the usual type inference. You only specify the columns you want to override, eg.
columntypes={'phone': 'varchar(12)', 'age': 'int'})
.- specifycols: boolean
Adds a column list to the Redshift COPY command, allowing for the source table in an append to have the columnns out of order, and to have fewer columns with any leftover target table columns filled in with the DEFAULT value.
This will fail if all of the source table’s columns do not match a column in the target table. This will also fail if the target table has an IDENTITY column and that column name is among the source table’s columns.
- alter_table: boolean
Will check if the target table varchar widths are wide enough to copy in the table data. If not, will attempt to alter the table to make it wide enough. This will not work with tables that have dependent views. To drop them, set
alter_table_cascade
to True.- alter_table_cascade: boolean
Will drop dependent objects when attempting to alter the table. If
alter_table
isFalse
, this will be ignored.- aws_access_key_id:
An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
- aws_secret_access_key:
An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
- iam_role: str
An AWS IAM Role ARN string; an alternative credential for the COPY command from Redshift to S3. The IAM role must have been assigned to the Redshift instance and have access to the S3 bucket.
- cleanup_s3_file: boolean
The s3 upload is removed by default on cleanup. You can set to False for debugging.
- template_table: str
Instead of specifying columns, columntypes, and/or inference, if there is a pre-existing table that has the same columns/types, then use the template_table table name as the schema for the new table. Unless you set specifycols=False explicitly, a template_table will set it to True
- temp_bucket_region: str
The AWS region that the temp bucket (specified by the TEMP_S3_BUCKET environment variable) is located in. This should be provided if the Redshift cluster is located in a different region from the temp bucket.
- strict_length: bool
Whether or not to tightly fit the length of the table columns to the length of the data in
tbl
; ifpadding
is specified, this argument is ignored.- csv_ecoding: str
String encoding to use when writing the temporary CSV file that is uploaded to S3. Defaults to ‘utf-8’.
- Returns
- Parsons Table or
None
See Parsons Table for output options.
- Parsons Table or
- parsons.Redshift.copy_s3(self, table_name, bucket, key, manifest=False, data_type='csv', csv_delimiter=',', compression=None, if_exists='fail', max_errors=0, distkey=None, sortkey=None, padding=None, varchar_max=None, statupdate=True, compupdate=True, ignoreheader=1, acceptanydate=True, dateformat='auto', timeformat='auto', emptyasnull=True, blanksasnull=True, nullas=None, acceptinvchars=True, truncatecolumns=False, columntypes=None, specifycols=None, aws_access_key_id=None, aws_secret_access_key=None, bucket_region=None, strict_length=True, template_table=None, encoding='utf-8', line_delimited=False)
Copy a file from s3 to Redshift.
- Args:
- table_name: str
The table name and schema (
tmc.cool_table
) to point the file.- bucket: str
The s3 bucket where the file or manifest is located.
- key: str
The key of the file or manifest in the s3 bucket.
- manifest: str
If using a manifest
- data_type: str
The data type of the file. Only
csv
supported currently.- csv_delimiter: str
The delimiter of the
csv
. Only relevant if data_type iscsv
.- compression: str
If specified (
gzip
), will attempt to decompress the file.- if_exists: str
If the table already exists, either
fail
,append
,drop
ortruncate
the table.- max_errors: int
The maximum number of rows that can error and be skipped before the job fails.
- distkey: str
The column name of the distkey
- sortkey: str
The column name of the sortkey
- padding: float
A percentage padding to add to varchar columns if creating a new table. This is helpful to add a buffer for future copies in which the data might be wider.
- varchar_max: list
A list of columns in which to set the width of the varchar column to 65,535 characters.
- statupate: boolean
Governs automatic computation and refresh of optimizer statistics at the end of a successful COPY command.
- compupdate: boolean
Controls whether compression encodings are automatically applied during a COPY.
- ignore_header: int
The number of header rows to skip. Ignored if data_type is
json
.- acceptanydate: boolean
Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.
- emptyasnull: boolean
Indicates that Amazon Redshift should load empty char and varchar fields as
NULL
.- blanksasnull: boolean
Loads blank varchar fields, which consist of only white space characters, as
NULL
.- nullas: str
Loads fields that match string as NULL
- acceptinvchars: boolean
Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.
- dateformat: str
Set the date format. Defaults to
auto
.- timeformat: str
Set the time format. Defaults to
auto
.- truncatecolumns: boolean
If the table already exists, truncates data in columns to the appropriate number of characters so that it fits the column specification. Applies only to columns with a VARCHAR or CHAR data type, and rows 4 MB or less in size.
- columntypes: dict
Optional map of column name to redshift column type, overriding the usual type inference. You only specify the columns you want to override, eg.
columntypes={'phone': 'varchar(12)', 'age': 'int'})
.- specifycols: boolean
Adds a column list to the Redshift COPY command, allowing for the source table in an append to have the columnns out of order, and to have fewer columns with any leftover target table columns filled in with the DEFAULT value.
This will fail if all of the source table’s columns do not match a column in the target table. This will also fail if the target table has an IDENTITY column and that column name is among the source table’s columns.
- aws_access_key_id:
An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
- aws_secret_access_key:
An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
- bucket_region: str
The AWS region that the bucket is located in. This should be provided if the Redshift cluster is located in a different region from the temp bucket.
- strict_length: bool
If the database table needs to be created, strict_length determines whether the created table’s column sizes will be sized to exactly fit the current data, or if their size will be rounded up to account for future values being larger then the current dataset. defaults to
True
; this argument is ignored ifpadding
is specified- template_table: str
Instead of specifying columns, columntypes, and/or inference, if there is a pre-existing table that has the same columns/types, then use the template_table table name as the schema for the new table.
- Returns
- Parsons Table or
None
See Parsons Table for output options.
- Parsons Table or
- parsons.Redshift.unload(self, sql, bucket, key_prefix, manifest=True, header=True, delimiter='|', compression='gzip', add_quotes=True, null_as=None, escape=True, allow_overwrite=True, parallel=True, max_file_size='6.2 GB', extension=None, aws_region=None, aws_access_key_id=None, aws_secret_access_key=None)
Unload Redshift data to S3 Bucket. This is a more efficient method than running a query to export data as it can export in parallel and directly into an S3 bucket. Consider using this for exports of 10MM or more rows.
- sql: str
The SQL string to execute to generate the data to unload.
- bucket: str
The destination S3 bucket
- key_prefix: str
The prefix of the key names that will be written
- manifest: boolean
Creates a manifest file that explicitly lists details for the data files that are created by the UNLOAD process.
- header: boolean
Adds a header line containing column names at the top of each output file.
- delimiter: str
Specificies the character used to separate fields. Defaults to ‘|’.
- compression: str
One of
gzip
,bzip2
orNone
. Unloads data to one or more compressed files per slice. Each resulting file is appended with a.gz
or.bz2
extension.- add_quotes: boolean
Places quotation marks around each unloaded data field, so that Amazon Redshift can unload data values that contain the delimiter itself.
- null_as: str
Specifies a string that represents a null value in unload files. If this option is not specified, null values are unloaded as zero-length strings for delimited output.
- escape: boolean
For CHAR and VARCHAR columns in delimited unload files, an escape character () is placed before every linefeed, carriage return, escape characters and delimiters.
- allow_overwrite: boolean
If
True
, will overwrite existing files, including the manifest file. IfFalse
will fail.- parallel: boolean
By default, UNLOAD writes data in parallel to multiple files, according to the number of slices in the cluster. The default option is ON or TRUE. If PARALLEL is OFF or FALSE, UNLOAD writes to one or more data files serially, sorted absolutely according to the ORDER BY clause, if one is used.
- max_file_size: str
The maximum size of files UNLOAD creates in Amazon S3. Specify a decimal value between 5 MB and 6.2 GB.
- extension: str
This extension will be added to the end of file names loaded to S3
- region: str
The AWS Region where the target Amazon S3 bucket is located. REGION is required for UNLOAD to an Amazon S3 bucket that is not in the same AWS Region as the Amazon Redshift cluster.
- aws_access_key_id:
An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
- aws_secret_access_key:
An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables.
- parsons.Redshift.upsert(self, table_obj, target_table, primary_key, vacuum=True, distinct_check=True, cleanup_temp_table=True, alter_table=True, alter_table_cascade=False, from_s3=False, distkey=None, sortkey=None, **copy_args)
Preform an upsert on an existing table. An upsert is a function in which rows in a table are updated and inserted at the same time.
- Args:
- table_obj: obj
A Parsons table object
- target_table: str
The schema and table name to upsert
- primary_key: str or list
The primary key column(s) of the target table
- vacuum: boolean
Re-sorts rows and reclaims space in the specified table. You must be a table owner or super user to effectively vacuum a table, however the method will not fail if you lack these priviledges.
- distinct_check: boolean
Check if the primary key column is distinct. Raise error if not.
- cleanup_temp_table: boolean
A temp table is dropped by default on cleanup. You can set to False for debugging.
- alter_table: boolean
Set to False to avoid automatic varchar column resizing to accomodate new data
- alter_table_cascade: boolean
Will drop dependent objects when attempting to alter the table. If
alter_table
isFalse
, this will be ignored.- from_s3: boolean
Instead of specifying a table_obj (set the first argument to None), set this to True and include
copy_s3()
arguments to upsert a pre-existing s3 file into the target_table- distkey: str
The column name of the distkey. If not provided, will default to
primary_key
.- sortkey: str or list
The column name(s) of the sortkey. If not provided, will default to
primary_key
.- **copy_args: kwargs
See
copy()
for options.
- parsons.Redshift.generate_manifest(self, buckets, aws_access_key_id=None, aws_secret_access_key=None, mandatory=True, prefix=None, manifest_bucket=None, manifest_key=None, path=None)
Given a list of S3 buckets, generate a manifest file (JSON format). A manifest file allows you to copy multiple files into a single table at once. Once the manifest is generated, you can pass it with the
copy_s3()
method.AWS keys are not required if
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environmental variables set.Args:
- buckets: list or str
A list of buckets or single bucket from which to generate manifest
- aws_access_key_id: str
AWS access key id to access S3 bucket
- aws_secret_access_key: str
AWS secret access key to access S3 bucket
- mandatory: boolean
The mandatory flag indicates whether the Redshift COPY should terminate if the file does not exist.
- prefix: str
Optional filter for key prefixes
- manifest_bucket: str
Optional bucket to write manifest file.
- manifest_key: str
Optional key name for S3 bucket to write file
- Returns:
dict
of manifest
- parsons.Redshift.alter_table_column_type(self, table_name, column_name, data_type, varchar_width=None)
Alter a column type of an existing table.
- table_name: str
The table name (ex.
my_schema.my_table
).- column_name: str
The target column name
- data_type: str
A valid Redshift data type to alter the table to.
- varchar_width:
The new width of the column if of type varchar.
Table and View API
Table and view utilities are a series of helper methods, all built off of commonly used SQL queries run against the Redshift database.
- class parsons.databases.redshift.redshift.RedshiftTableUtilities[source]
- table_exists(table_name: str, view: bool = True) bool [source]
Check if a table or view exists in the database.
- Args:
- table_name: str
The table name and schema (e.g.
myschema.mytable
).- view: boolean
Check to see if a view exists by the same name
- Returns:
- boolean
True
if the table exists andFalse
if it does not.
- get_row_count(table_name)[source]
Return the row count of a table.
SQL Code
SELECT COUNT(*) FROM myschema.mytable
- Args:
- table_name: str
The schema and name (e.g.
myschema.mytable
) of the table.
- Returns:
int
- rename_table(table_name, new_table_name)[source]
Rename an existing table.
Note
You cannot move schemas when renaming a table. Instead, utilize the
table_duplicate()
. method.- Args:
- table_name: str
Name of existing schema and table (e.g.
myschema.oldtable
)- new_table_name: str
New name for table with the schema omitted (e.g.
newtable
).
- move_table(source_table, new_table, drop_source_table=False)[source]
Move an existing table in the database.It will inherit encoding, sortkey and distkey. Once run, the source table rows will be empty. This is more efficiant than running
"create newtable as select * from oldtable"
.For more information see: ALTER TABLE APPEND
- Args:
- source_table: str
Name of existing schema and table (e.g.
my_schema.old_table
)- new_table: str
New name of schema and table (e.g.
my_schema.newtable
)- drop_original: boolean
Drop the source table.
- Returns:
None
- populate_table_from_query(query, destination_table, if_exists='fail', distkey=None, sortkey=None)[source]
Populate a Redshift table with the results of a SQL query, creating the table if it doesn’t yet exist.
- Args:
- query: str
The SQL query
- destination_table: str
Name of destination schema and table (e.g.
mys_chema.new_table
)- if_exists: str
If the table already exists, either
fail
,append
,drop
, ortruncate
the table.- distkey: str
The column to use as the distkey for the table.
- sortkey: str
The column to use as the sortkey for the table.
- duplicate_table(source_table, destination_table, where_clause='', if_exists='fail', drop_source_table=False)[source]
Create a copy of an existing table (or subset of rows) in a new table. It will inherit encoding, sortkey and distkey.
- Args:
- source_table: str
Name of existing schema and table (e.g.
myschema.oldtable
)- destination_table: str
Name of destination schema and table (e.g.
myschema.newtable
)- where_clause: str
An optional where clause (e.g.
where org = 1
).- if_exists: str
If the table already exists, either
fail
,append
,drop
, ortruncate
the table.- drop_source_table: boolean
Drop the source table
- union_tables(new_table_name, tables, union_all=True, view=False)[source]
Union a series of table into a new table.
- Args:
- new_table_name: str
The new table and schema (e.g.
myschema.newtable
)- tables: list
A list of tables to union
- union_all: boolean
If
False
will deduplicate rows. IfTrue
will include duplicate rows.- view: boolean
Create a view rather than a static table
- Returns:
None
- get_tables(schema=None, table_name=None)[source]
List the tables in a schema including metadata.
- Args:
- schema: str
Filter by a schema
- table_name: str
Filter by a table name
- Returns:
- Parsons Table
See Parsons Table for output options.
- get_table_stats(schema=None, table_name=None)[source]
List the tables statistics includes row count and size.
Warning
This method is only accessible by Redshift superusers.
- Args:
- schema: str
Filter by a schema
- table_name: str
Filter by a table name
- Returns:
- Parsons Table
See Parsons Table for output options.
- get_columns(schema, table_name)[source]
Gets the column names (and some other column info) for a table.
If you just need the column names, run
get_columns_list()
as it is faster.for col in rs.get_columns('some_schema', 'some_table'): print(col)
- Args:
- schema: str
The schema name
- table_name: str
The table name
- Returns:
A dict mapping column name to a dict with extra info. The keys of the dict are ordered just like the columns in the table. The extra info is a dict with format
{ 'data_type': str, 'max_length': int or None, 'max_precision': int or None, 'max_scale': int or None, 'is_nullable': bool }
- get_columns_list(schema, table_name)[source]
Gets the just the column names for a table.
- Args:
- schema: str
The schema name
- table_name: str
The table name
- Returns:
A list of column names.
- get_views(schema=None, view=None)[source]
List views.
- Args:
- schema: str
Filter by a schema
- view: str
Filter by a table name
- Returns:
- Parsons Table
See Parsons Table for output options.
- get_queries()[source]
Return the Current queries running and queueing, along with resource consumption.
Warning
Must be a Redshift superuser to run this method.
- Returns:
- Parsons Table
See Parsons Table for output options.
- get_max_value(table_name, value_column)[source]
Return the max value from a table.
- Args:
- table_name: str
Schema and table name
- value_column: str
The column containing the values
- get_object_type(object_name)[source]
Get object type.
One of view, table, index, sequence, or TOAST table.
- Args:
- object_name: str
The schema.obj for which to get the object type.
- Returns:
str of the object type.
- is_view(object_name)[source]
Return true if the object is a view.
- Args:
- object_name: str
The schema.obj to test if it’s a view.
- Returns:
bool
- is_table(object_name)[source]
Return true if the object is a table.
- Args:
- object_name: str
The schema.obj to test if it’s a table.
- Returns:
bool
- get_table_definition(table)[source]
Get the table definition (i.e. the create statement).
- Args:
- table: str
The schema.table for which to get the table definition.
- Returns:
str
- get_table_definitions(schema=None, table=None)[source]
Get the table definition (i.e. the create statement) for multiple tables.
This works similar to get_table_def except it runs a single query to get the ddl for multiple tables. It supports SQL wildcards for schema and table. Only returns the ddl for _tables_ that match schema and table if they exist.
- Args:
- schema: str
The schema to filter by.
- table: str
The table to filter by.
- Returns:
list of dicts with matching tables.
- get_view_definition(view)[source]
Get the view definition (i.e. the create statement).
- Args:
- view: str
The schema.view for which to get the view definition.
- Returns:
str
- get_view_definitions(schema=None, view=None)[source]
Get the view definition (i.e. the create statement) for multiple views.
This works similar to get_view_def except it runs a single query to get the ddl for multiple views. It supports SQL wildcards for schema and view. Only returns the ddl for _views_ that match schema and view if they exist.
- Args:
- schema: str
The schema to filter by.
- view: str
The view to filter by.
- Returns:
list of dicts with matching views.
- static split_full_table_name(full_table_name)[source]
Split a full table name into its schema and table. If a schema isn’t present, return public for the schema. Similarly, Redshift defaults to the public schema, when one isn’t provided.
Eg:
(schema, table) = Redshift.split_full_table_name("some_schema.some_table")
- Args:
- full_table_name: str
The table name, as “schema.table”
- Returns:
- tuple
A tuple containing (schema, table)
Schema API
Schema utilities are a series of helper methods, all built off of commonly used SQL queries run against the Redshift database.
- class parsons.databases.redshift.redshift.RedshiftSchema[source]
- create_schema_with_permissions(schema, group=None)[source]
Creates a Redshift schema (if it doesn’t already exist), and grants usage permissions to a Redshift group (if specified).
- Args:
- schema: str
The schema name
- group: str
The Redshift group name
- type: str
The type of permissions to grant. Supports select, all, etc. (For full list, see the Redshift GRANT docs)
- grant_schema_permissions(schema, group, permissions_type='select')[source]
Grants a Redshift group permissions to all tables within an existing schema.
- Args:
- schema: str
The schema name
- group: str
The Redshift group name
- type: str
The type of permissions to grant. Supports select, all, etc. (For full list, see the Redshift GRANT docs)