Amazon AWS

Overview

Heavily reliant on the boto3 python package, Amazon AWS is a very large eco-system.

The two most popular and supported in Parsons AWS systems are s3 and redshift

Distributing Tasks on Lambda

parsons.aws.distribute_task(table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage='s3')[source]

Distribute processing rows in a table across multiple AWS Lambda invocations.

If you are running the processing of a table inside AWS Lambda, then you are limited by how many rows can be processed within the Lambda’s time limit (at time-of-writing, maximum 15min).

Based on experience and some napkin math, with the same data that would allow 1000 rows to be processed inside a single AWS Lambda instance, this method allows 10 MILLION rows to be processed.

Rather than converting the table to SQS or other options, the fastest way is to upload the table to S3, and then invoke multiple Lambda sub-invocations, each of which can be sent a byte-range of the data in the S3 CSV file for which to process.

Using this method requires some setup. You have three tasks:

  1. Define the function to process rows, the first argument, must take your table’s data (though only a subset of rows will be passed) (e.g. def task_for_distribution(table, **kwargs):)

  2. Where you would have run task_for_distribution(my_table, **kwargs) instead call `distribute_task(my_table, task_for_distribution, func_kwargs=kwargs) (either setting env var S3_TEMP_BUCKET or passing a bucket= parameter)

  3. Setup your Lambda handler to include parsons.aws.event_command() (or run and deploy your lambda with Zappa)

To test locally, include the argument storage=”local” which will test the distribute_task function, but run the task sequentially and in local memory.

A minimalistic example Lambda handler might look something like this:

from parsons.aws import event_command, distribute_task

def process_table(table, foo, bar=None):
    for row in table:
        do_sloooooow_thing(row, foo, bar)

def handler(event, context):
    ## ADD THESE TWO LINES TO TOP OF HANDLER:
    if event_command(event, context):
        return
    table = FakeDatasource.load_to_table(username='123', password='abc')
    # table is so big that running
    #   process_table(table, foo=789, bar='baz') would timeout
    # so instead we:
    distribute_task(table, process_table,
                    bucket='my-temp-s3-bucket',
                    func_kwargs={'foo': 789, 'bar': 'baz'})
Args:
table: Parsons Table

Table of data you wish to distribute processing across Lambda invocations of func_to_run argument.

func_to_run: function

The function you want to run whose first argument will be a subset of table

bucket: str

The bucket name to use for s3 upload to process the whole table Not required if you set environment variable S3_TEMP_BUCKET

func_kwargs: dict

If the function has other arguments to pass along with table then provide them as a dict here. They must all be JSON-able.

func_class: class

If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed ActionKit.bulk_upload_table, then you would pass ActionKit here.

func_class_kwargs: dict

If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed ActionKit.bulk_upload_table as the function, then you would pass {‘domain’: …, ‘username’: … etc} here. This must all be JSON-able data.

catch: bool

Lambda will retry running an event three times if there’s an exception – if you want to prevent this, set catch=True and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string “Distribute Error” This might be important if row-actions are not idempotent and your own function might fail causing repeats.

group_count: int

Set this to how many rows to process with each Lambda invocation (Default: 100)

storage: str

Debugging option: Defaults to “s3”. To test distribution locally without s3, set to “local”.

Returns:

Debug information – do not rely on the output, as it will change depending on how this method is invoked.

parsons.aws.event_command(event, context)[source]

Minimal shim to add to the top lambda handler function to enable distributed tasks In your lambda handler:

from parsons.aws import event_command

def handler(event, context):
    ## ADD THESE TWO LINES TO TOP OF HANDLER:
    if event_command(event, context):
        return

The rest of this library is compatible with zappa.async library. If you have deployed your app with Zappa <https://github.com/Miserlou/Zappa>, then you do NOT need to add this shim.