Lambda¶
Overview¶
Parsons’ distribute_task() function
allows you to distribute process rows of a table across multiple
AWS Lambda invocations.
If you are running the processing of a table inside AWS Lambda, then you are limited by how many rows can be processed within the Lambda’s time limit (at time-of-writing, maximum 15min).
Based on experience and some napkin math, with the same data that would allow 1000 rows to be processed inside a single AWS Lambda instance, this method allows 10 MILLION rows to be processed.
Rather than converting the table to SQS or other options, the fastest way is to upload the table to S3, and then invoke multiple Lambda sub-invocations, each of which can be sent a byte-range of the data in the S3 CSV file for which to process.
Using this method requires some setup. You have three tasks:
Define the function to process rows, the first argument, must take your table’s data (though only a subset of rows will be passed) (e.g.
def task_for_distribution(table, **kwargs):)Where you would have run
task_for_distribution(my_table, **kwargs)instead calldistribute_task(my_table, task_for_distribution, func_kwargs=kwargs)(either setting env var S3_TEMP_BUCKET or passing abucket=parameter)Setup your Lambda handler to include
event_command()(or run and deploy your lambda with Zappa)
To test locally, include the argument storage="local",
which will test the distribute_task() function,
but run the task sequentially and in local memory.
Quickstart¶
from parsons.aws import event_command, distribute_task
def process_table(table, foo, bar=None):
for row in table:
do_sloooooow_thing(row, foo, bar)
def handler(event, context):
## ADD THESE TWO LINES TO TOP OF HANDLER:
if event_command(event, context):
return
table = FakeDatasource.load_to_table(username='123', password='abc')
# table is so big that running
# process_table(table, foo=789, bar='baz')
# would timeout so instead we:
distribute_task(
table, process_table,
bucket='my-temp-s3-bucket',
func_kwargs={'foo': 789, 'bar': 'baz'}
)
API¶
- parsons.aws.lambda_distribute.distribute_task(table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage='s3', use_s3_env_token=True)[source]¶
Distribute processing rows in a table across multiple AWS Lambda invocations.
- Parameters:
table – Parsons Table Table of data you wish to distribute processing across Lambda invocations of func_to_run argument.
func_to_run – function The function you want to run whose first argument will be a subset of table
bucket – str The bucket name to use for s3 upload to process the whole table Not required if you set environment variable
S3_TEMP_BUCKETfunc_kwargs – dict If the function has other arguments to pass along with table then provide them as a dict here. They must all be JSON-able.
func_class – class If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed ActionKit.bulk_upload_table, then you would pass ActionKit here.
func_class_kwargs – dict If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed ActionKit.bulk_upload_table as the function, then you would pass {‘domain’: …, ‘username’: … etc} here. This must all be JSON-able data.
catch – bool Lambda will retry running an event three times if there’s an exception – if you want to prevent this, set catch=True and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string “Distribute Error” This might be important if row-actions are not idempotent and your own function might fail causing repeats.
group_count – int Set this to how many rows to process with each Lambda invocation (Default: 100)
storage – str Debugging option: Defaults to “s3”. To test distribution locally without s3, set to “local”.
use_s3_env_token – str If storage is set to “s3”, sets the use_env_token parameter on the S3 storage.
- Returns:
Debug information – do not rely on the output, as it will change depending on how this method is invoked.