Source code for parsons.aws.lambda_distribute

import csv
from io import TextIOWrapper, BytesIO, StringIO
import logging
import sys
import traceback
import time

from parsons.aws.aws_async import get_func_task_path, import_and_get_task, run as maybe_async_run
from parsons.aws.s3 import S3
from parsons.etl.table import Table
from parsons.utilities.check_env import check

logger = logging.getLogger(__name__)


class DistributeTaskException(Exception):
    pass


class TestStorage:

    def __init__(self):
        self.data = {}

    def put_object(self, bucket, key, object_bytes):
        self.data[key] = object_bytes

    def get_range(self, bucket, key, rangestart, rangeend):
        return self.data[key][rangestart:rangeend]


class S3Storage:
    """
    These methods are pretty specialized, so we keep them
    inside this file rather than s3.py
    """

    def __init__(self, use_env_token=True):
        self.s3 = S3(use_env_token=use_env_token)

    def put_object(self, bucket, key, object_bytes, **kwargs):
        return self.s3.client.put_object(Bucket=bucket, Key=key, Body=object_bytes, **kwargs)

    def get_range(self, bucket, key, rangestart, rangeend):
        """
        Gets an explicit byte-range of an S3 file
        """
        # bytes is INCLUSIVE for the rangeend parameter, unlike python
        # so e.g. while python returns 2 bytes for data[2:4]
        # Range: bytes=2-4 will return 3!! So we subtract 1
        response = self.s3.client.get_object(
            Bucket=bucket, Key=key,
            Range='bytes={}-{}'.format(rangestart, rangeend - 1))
        return response['Body'].read()


FAKE_STORAGE = TestStorage()
S3_TEMP_KEY_PREFIX = "Parsons_DistributeTask"


def distribute_task_csv(csv_bytes_utf8, func_to_run, bucket,
                        header=None,
                        func_kwargs=None,
                        func_class=None,
                        func_class_kwargs=None,
                        catch=False,
                        group_count=100,
                        storage='s3',
                        use_s3_env_token=True):
    """
    The same as distribute_task, but instead of a table, the
    first argument is bytes of a csv encoded into utf8.
    This function is used by distribute_task() which you should use instead.
    """
    global FAKE_STORAGE
    func_name = get_func_task_path(func_to_run, func_class)
    row_chunks = csv_bytes_utf8.split(b'\n')
    cursor = 0
    row_ranges = []
    # gather start/end bytes for each row
    for rowc in row_chunks:
        rng = [cursor]
        cursor = cursor + len(rowc) + 1  # +1 is the \n character
        rng.append(cursor)
        row_ranges.append(rng)

    # group the rows and get start/end bytes for each group
    group_ranges = []
    # table csv writer appends a terminal \r\n, so we do len-1
    for grpstep in range(0, len(row_ranges) - 1, group_count):
        end = min(len(row_ranges) - 1, grpstep + group_count - 1)
        group_ranges.append((row_ranges[grpstep][0], row_ranges[end][1]))

    # upload data
    filename = hash(time.time())
    storagekey = f"{S3_TEMP_KEY_PREFIX}/{filename}.csv"
    groupcount = len(group_ranges)
    logger.debug(f'distribute_task_csv storagekey {storagekey} w/ {groupcount} groups')

    response = None
    if storage == 's3':
        response = S3Storage(use_env_token=use_s3_env_token).put_object(
            bucket, storagekey, csv_bytes_utf8
        )
    else:
        response = FAKE_STORAGE.put_object(bucket, storagekey, csv_bytes_utf8)

    # start processes
    results = [
        maybe_async_run(
            process_task_portion,
            [bucket, storagekey, grp[0], grp[1], func_name, header,
             storage, func_kwargs, catch, func_class_kwargs, use_s3_env_token],
            # if we are using local storage, then it must be run locally, as well
            # (good for testing/debugging)
            remote_aws_lambda_function_name='FORCE_LOCAL' if storage == 'local' else None
        )
        for grp in group_ranges]
    return {'DEBUG_ONLY': 'results may vary depending on context/platform',
            'results': results,
            'put_response': response}


[docs]def distribute_task(table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage='s3', use_s3_env_token=True): """ Distribute processing rows in a table across multiple AWS Lambda invocations. `Args:` table: Parsons Table Table of data you wish to distribute processing across Lambda invocations of `func_to_run` argument. func_to_run: function The function you want to run whose first argument will be a subset of table bucket: str The bucket name to use for s3 upload to process the whole table Not required if you set environment variable ``S3_TEMP_BUCKET`` func_kwargs: dict If the function has other arguments to pass along with `table` then provide them as a dict here. They must all be JSON-able. func_class: class If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed `ActionKit.bulk_upload_table`, then you would pass `ActionKit` here. func_class_kwargs: dict If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed `ActionKit.bulk_upload_table` as the function, then you would pass {'domain': ..., 'username': ... etc} here. This must all be JSON-able data. catch: bool Lambda will retry running an event three times if there's an exception -- if you want to prevent this, set `catch=True` and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string "Distribute Error" This might be important if row-actions are not idempotent and your own function might fail causing repeats. group_count: int Set this to how many rows to process with each Lambda invocation (Default: 100) storage: str Debugging option: Defaults to "s3". To test distribution locally without s3, set to "local". use_s3_env_token: str If storage is set to "s3", sets the use_env_token parameter on the S3 storage. `Returns:` Debug information -- do not rely on the output, as it will change depending on how this method is invoked. """ if storage not in ('s3', 'local'): raise DistributeTaskException('storage argument must be s3 or local') bucket = check('S3_TEMP_BUCKET', bucket) csvdata = StringIO() outcsv = csv.writer(csvdata) outcsv.writerows(table.table.data()) return distribute_task_csv(csvdata.getvalue().encode('utf-8-sig'), func_to_run, bucket, header=table.columns, func_kwargs=func_kwargs, func_class=func_class, func_class_kwargs=func_class_kwargs, catch=catch, group_count=group_count, storage=storage, use_s3_env_token=use_s3_env_token)
def process_task_portion(bucket, storagekey, rangestart, rangeend, func_name, header, storage='s3', func_kwargs=None, catch=False, func_class_kwargs=None, use_s3_env_token=True): global FAKE_STORAGE logger.debug(f'process_task_portion func_name {func_name}, ' f'storagekey {storagekey}, byterange {rangestart}-{rangeend}') func = import_and_get_task(func_name, func_class_kwargs) if storage == 's3': filedata = S3Storage(use_env_token=use_s3_env_token).get_range( bucket, storagekey, rangestart, rangeend ) else: filedata = FAKE_STORAGE.get_range(bucket, storagekey, rangestart, rangeend) lines = list(csv.reader(TextIOWrapper(BytesIO(filedata), encoding='utf-8-sig'))) table = Table([header] + lines) if catch: try: func(table, **func_kwargs) except Exception: # In Lambda you can search for '"Distribute Error"' in the logs type_, value_, traceback_ = sys.exc_info() err_traceback_str = '\n'.join(traceback.format_exception(type_, value_, traceback_)) return {'Exception': 'Distribute Error', 'error': err_traceback_str, 'rangestart': rangestart, 'rangeend': rangeend, 'func_name': func_name, 'bucket': bucket, 'storagekey': storagekey} else: func(table, **func_kwargs)