Amazon AWS¶
Overview¶
Heavily reliant on the boto3 python package, Amazon AWS is a very large eco-system.
The two most popular and supported in Parsons AWS systems are s3 and redshift
Distributing Tasks on Lambda¶
- parsons.aws.distribute_task(table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage='s3')[source]¶
Distribute processing rows in a table across multiple AWS Lambda invocations.
If you are running the processing of a table inside AWS Lambda, then you are limited by how many rows can be processed within the Lambda’s time limit (at time-of-writing, maximum 15min).
Based on experience and some napkin math, with the same data that would allow 1000 rows to be processed inside a single AWS Lambda instance, this method allows 10 MILLION rows to be processed.
Rather than converting the table to SQS or other options, the fastest way is to upload the table to S3, and then invoke multiple Lambda sub-invocations, each of which can be sent a byte-range of the data in the S3 CSV file for which to process.
Using this method requires some setup. You have three tasks:
Define the function to process rows, the first argument, must take your table’s data (though only a subset of rows will be passed) (e.g. def task_for_distribution(table, **kwargs):)
Where you would have run task_for_distribution(my_table, **kwargs) instead call `distribute_task(my_table, task_for_distribution, func_kwargs=kwargs) (either setting env var S3_TEMP_BUCKET or passing a bucket= parameter)
Setup your Lambda handler to include
parsons.aws.event_command()(or run and deploy your lambda with Zappa)
To test locally, include the argument storage=”local” which will test the distribute_task function, but run the task sequentially and in local memory.
A minimalistic example Lambda handler might look something like this:
from parsons.aws import event_command, distribute_task def process_table(table, foo, bar=None): for row in table: do_sloooooow_thing(row, foo, bar) def handler(event, context): ## ADD THESE TWO LINES TO TOP OF HANDLER: if event_command(event, context): return table = FakeDatasource.load_to_table(username='123', password='abc') # table is so big that running # process_table(table, foo=789, bar='baz') would timeout # so instead we: distribute_task(table, process_table, bucket='my-temp-s3-bucket', func_kwargs={'foo': 789, 'bar': 'baz'})
- Args:
- table: Parsons Table
Table of data you wish to distribute processing across Lambda invocations of func_to_run argument.
- func_to_run: function
The function you want to run whose first argument will be a subset of table
- bucket: str
The bucket name to use for s3 upload to process the whole table Not required if you set environment variable
S3_TEMP_BUCKET- func_kwargs: dict
If the function has other arguments to pass along with table then provide them as a dict here. They must all be JSON-able.
- func_class: class
If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed ActionKit.bulk_upload_table, then you would pass ActionKit here.
- func_class_kwargs: dict
If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed ActionKit.bulk_upload_table as the function, then you would pass {‘domain’: …, ‘username’: … etc} here. This must all be JSON-able data.
- catch: bool
Lambda will retry running an event three times if there’s an exception – if you want to prevent this, set catch=True and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string “Distribute Error” This might be important if row-actions are not idempotent and your own function might fail causing repeats.
- group_count: int
Set this to how many rows to process with each Lambda invocation (Default: 100)
- storage: str
Debugging option: Defaults to “s3”. To test distribution locally without s3, set to “local”.
- Returns:
Debug information – do not rely on the output, as it will change depending on how this method is invoked.
- parsons.aws.event_command(event, context)[source]¶
Minimal shim to add to the top lambda handler function to enable distributed tasks In your lambda handler:
from parsons.aws import event_command def handler(event, context): ## ADD THESE TWO LINES TO TOP OF HANDLER: if event_command(event, context): return
The rest of this library is compatible with zappa.async library. If you have deployed your app with Zappa <https://github.com/Miserlou/Zappa>, then you do NOT need to add this shim.