Source code for parsons.aws.lambda_distribute

import csv
from io import TextIOWrapper, BytesIO, StringIO
import logging
import sys
import traceback
import time

from parsons.aws.aws_async import (
    get_func_task_path,
    import_and_get_task,
    run as maybe_async_run,
)
from parsons.aws.s3 import S3
from parsons.etl.table import Table
from parsons.utilities.check_env import check

logger = logging.getLogger(__name__)


class DistributeTaskException(Exception):
    pass


class TestStorage:
    def __init__(self):
        self.data = {}

    def put_object(self, bucket, key, object_bytes):
        self.data[key] = object_bytes

    def get_range(self, bucket, key, rangestart, rangeend):
        return self.data[key][rangestart:rangeend]


class S3Storage:
    """
    These methods are pretty specialized, so we keep them
    inside this file rather than s3.py
    """

    def __init__(self, use_env_token=True):
        self.s3 = S3(use_env_token=use_env_token)

    def put_object(self, bucket, key, object_bytes, **kwargs):
        return self.s3.client.put_object(Bucket=bucket, Key=key, Body=object_bytes, **kwargs)

    def get_range(self, bucket, key, rangestart, rangeend):
        """
        Gets an explicit byte-range of an S3 file
        """
        # bytes is INCLUSIVE for the rangeend parameter, unlike python
        # so e.g. while python returns 2 bytes for data[2:4]
        # Range: bytes=2-4 will return 3!! So we subtract 1
        response = self.s3.client.get_object(
            Bucket=bucket, Key=key, Range="bytes={}-{}".format(rangestart, rangeend - 1)
        )
        return response["Body"].read()


FAKE_STORAGE = TestStorage()
S3_TEMP_KEY_PREFIX = "Parsons_DistributeTask"


def distribute_task_csv(
    csv_bytes_utf8,
    func_to_run,
    bucket,
    header=None,
    func_kwargs=None,
    func_class=None,
    func_class_kwargs=None,
    catch=False,
    group_count=100,
    storage="s3",
    use_s3_env_token=True,
):
    """
    The same as distribute_task, but instead of a table, the
    first argument is bytes of a csv encoded into utf8.
    This function is used by distribute_task() which you should use instead.
    """
    global FAKE_STORAGE
    func_name = get_func_task_path(func_to_run, func_class)
    row_chunks = csv_bytes_utf8.split(b"\n")
    cursor = 0
    row_ranges = []
    # gather start/end bytes for each row
    for rowc in row_chunks:
        rng = [cursor]
        cursor = cursor + len(rowc) + 1  # +1 is the \n character
        rng.append(cursor)
        row_ranges.append(rng)

    # group the rows and get start/end bytes for each group
    group_ranges = []
    # table csv writer appends a terminal \r\n, so we do len-1
    for grpstep in range(0, len(row_ranges) - 1, group_count):
        end = min(len(row_ranges) - 1, grpstep + group_count - 1)
        group_ranges.append((row_ranges[grpstep][0], row_ranges[end][1]))

    # upload data
    filename = hash(time.time())
    storagekey = f"{S3_TEMP_KEY_PREFIX}/{filename}.csv"
    groupcount = len(group_ranges)
    logger.debug(f"distribute_task_csv storagekey {storagekey} w/ {groupcount} groups")

    response = None
    if storage == "s3":
        response = S3Storage(use_env_token=use_s3_env_token).put_object(
            bucket, storagekey, csv_bytes_utf8
        )
    else:
        response = FAKE_STORAGE.put_object(bucket, storagekey, csv_bytes_utf8)

    # start processes
    results = [
        maybe_async_run(
            process_task_portion,
            [
                bucket,
                storagekey,
                grp[0],
                grp[1],
                func_name,
                header,
                storage,
                func_kwargs,
                catch,
                func_class_kwargs,
                use_s3_env_token,
            ],
            # if we are using local storage, then it must be run locally, as well
            # (good for testing/debugging)
            remote_aws_lambda_function_name="FORCE_LOCAL" if storage == "local" else None,
        )
        for grp in group_ranges
    ]
    return {
        "DEBUG_ONLY": "results may vary depending on context/platform",
        "results": results,
        "put_response": response,
    }


[docs]def distribute_task( table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage="s3", use_s3_env_token=True, ): """ Distribute processing rows in a table across multiple AWS Lambda invocations. `Args:` table: Parsons Table Table of data you wish to distribute processing across Lambda invocations of `func_to_run` argument. func_to_run: function The function you want to run whose first argument will be a subset of table bucket: str The bucket name to use for s3 upload to process the whole table Not required if you set environment variable ``S3_TEMP_BUCKET`` func_kwargs: dict If the function has other arguments to pass along with `table` then provide them as a dict here. They must all be JSON-able. func_class: class If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed `ActionKit.bulk_upload_table`, then you would pass `ActionKit` here. func_class_kwargs: dict If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed `ActionKit.bulk_upload_table` as the function, then you would pass {'domain': ..., 'username': ... etc} here. This must all be JSON-able data. catch: bool Lambda will retry running an event three times if there's an exception -- if you want to prevent this, set `catch=True` and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string "Distribute Error" This might be important if row-actions are not idempotent and your own function might fail causing repeats. group_count: int Set this to how many rows to process with each Lambda invocation (Default: 100) storage: str Debugging option: Defaults to "s3". To test distribution locally without s3, set to "local". use_s3_env_token: str If storage is set to "s3", sets the use_env_token parameter on the S3 storage. `Returns:` Debug information -- do not rely on the output, as it will change depending on how this method is invoked. """ if storage not in ("s3", "local"): raise DistributeTaskException("storage argument must be s3 or local") bucket = check("S3_TEMP_BUCKET", bucket) csvdata = StringIO() outcsv = csv.writer(csvdata) outcsv.writerows(table.table.data()) return distribute_task_csv( csvdata.getvalue().encode("utf-8-sig"), func_to_run, bucket, header=table.columns, func_kwargs=func_kwargs, func_class=func_class, func_class_kwargs=func_class_kwargs, catch=catch, group_count=group_count, storage=storage, use_s3_env_token=use_s3_env_token, )
def process_task_portion( bucket, storagekey, rangestart, rangeend, func_name, header, storage="s3", func_kwargs=None, catch=False, func_class_kwargs=None, use_s3_env_token=True, ): global FAKE_STORAGE logger.debug( f"process_task_portion func_name {func_name}, " f"storagekey {storagekey}, byterange {rangestart}-{rangeend}" ) func = import_and_get_task(func_name, func_class_kwargs) if storage == "s3": filedata = S3Storage(use_env_token=use_s3_env_token).get_range( bucket, storagekey, rangestart, rangeend ) else: filedata = FAKE_STORAGE.get_range(bucket, storagekey, rangestart, rangeend) lines = list(csv.reader(TextIOWrapper(BytesIO(filedata), encoding="utf-8-sig"))) table = Table([header] + lines) if catch: try: func(table, **func_kwargs) except Exception: # In Lambda you can search for '"Distribute Error"' in the logs type_, value_, traceback_ = sys.exc_info() err_traceback_str = "\n".join(traceback.format_exception(type_, value_, traceback_)) return { "Exception": "Distribute Error", "error": err_traceback_str, "rangestart": rangestart, "rangeend": rangeend, "func_name": func_name, "bucket": bucket, "storagekey": storagekey, } else: func(table, **func_kwargs)