Source code for

import csv
from io import TextIOWrapper, BytesIO, StringIO
import logging
import sys
import traceback
import time

from import (
    run as maybe_async_run,
from import S3
from parsons.etl.table import Table
from parsons.utilities.check_env import check

logger = logging.getLogger(__name__)

class DistributeTaskException(Exception):

class TestStorage:
    def __init__(self): = {}

    def put_object(self, bucket, key, object_bytes):[key] = object_bytes

    def get_range(self, bucket, key, rangestart, rangeend):

class S3Storage:
    These methods are pretty specialized, so we keep them
    inside this file rather than

    def __init__(self, use_env_token=True):
        self.s3 = S3(use_env_token=use_env_token)

    def put_object(self, bucket, key, object_bytes, **kwargs):
        return self.s3.client.put_object(Bucket=bucket, Key=key, Body=object_bytes, **kwargs)

    def get_range(self, bucket, key, rangestart, rangeend):
        Gets an explicit byte-range of an S3 file
        # bytes is INCLUSIVE for the rangeend parameter, unlike python
        # so e.g. while python returns 2 bytes for data[2:4]
        # Range: bytes=2-4 will return 3!! So we subtract 1
        response = self.s3.client.get_object(
            Bucket=bucket, Key=key, Range="bytes={}-{}".format(rangestart, rangeend - 1)
        return response["Body"].read()

FAKE_STORAGE = TestStorage()
S3_TEMP_KEY_PREFIX = "Parsons_DistributeTask"

def distribute_task_csv(
    The same as distribute_task, but instead of a table, the
    first argument is bytes of a csv encoded into utf8.
    This function is used by distribute_task() which you should use instead.
    global FAKE_STORAGE
    func_name = get_func_task_path(func_to_run, func_class)
    row_chunks = csv_bytes_utf8.split(b"\n")
    cursor = 0
    row_ranges = []
    # gather start/end bytes for each row
    for rowc in row_chunks:
        rng = [cursor]
        cursor = cursor + len(rowc) + 1  # +1 is the \n character

    # group the rows and get start/end bytes for each group
    group_ranges = []
    # table csv writer appends a terminal \r\n, so we do len-1
    for grpstep in range(0, len(row_ranges) - 1, group_count):
        end = min(len(row_ranges) - 1, grpstep + group_count - 1)
        group_ranges.append((row_ranges[grpstep][0], row_ranges[end][1]))

    # upload data
    filename = hash(time.time())
    storagekey = f"{S3_TEMP_KEY_PREFIX}/{filename}.csv"
    groupcount = len(group_ranges)
    logger.debug(f"distribute_task_csv storagekey {storagekey} w/ {groupcount} groups")

    response = None
    if storage == "s3":
        response = S3Storage(use_env_token=use_s3_env_token).put_object(
            bucket, storagekey, csv_bytes_utf8
        response = FAKE_STORAGE.put_object(bucket, storagekey, csv_bytes_utf8)

    # start processes
    results = [
            # if we are using local storage, then it must be run locally, as well
            # (good for testing/debugging)
            remote_aws_lambda_function_name="FORCE_LOCAL" if storage == "local" else None,
        for grp in group_ranges
    return {
        "DEBUG_ONLY": "results may vary depending on context/platform",
        "results": results,
        "put_response": response,

[docs]def distribute_task( table, func_to_run, bucket=None, func_kwargs=None, func_class=None, func_class_kwargs=None, catch=False, group_count=100, storage="s3", use_s3_env_token=True, ): """ Distribute processing rows in a table across multiple AWS Lambda invocations. `Args:` table: Parsons Table Table of data you wish to distribute processing across Lambda invocations of `func_to_run` argument. func_to_run: function The function you want to run whose first argument will be a subset of table bucket: str The bucket name to use for s3 upload to process the whole table Not required if you set environment variable ``S3_TEMP_BUCKET`` func_kwargs: dict If the function has other arguments to pass along with `table` then provide them as a dict here. They must all be JSON-able. func_class: class If the function is a classmethod or function on a class, then pass the pure class here. E.g. If you passed `ActionKit.bulk_upload_table`, then you would pass `ActionKit` here. func_class_kwargs: dict If it is a class function, and the class must be instantiated, then pass the kwargs to instantiate the class here. E.g. If you passed `ActionKit.bulk_upload_table` as the function, then you would pass {'domain': ..., 'username': ... etc} here. This must all be JSON-able data. catch: bool Lambda will retry running an event three times if there's an exception -- if you want to prevent this, set `catch=True` and then it will catch any errors and stop retries. The error will be in CloudWatch logs with string "Distribute Error" This might be important if row-actions are not idempotent and your own function might fail causing repeats. group_count: int Set this to how many rows to process with each Lambda invocation (Default: 100) storage: str Debugging option: Defaults to "s3". To test distribution locally without s3, set to "local". use_s3_env_token: str If storage is set to "s3", sets the use_env_token parameter on the S3 storage. `Returns:` Debug information -- do not rely on the output, as it will change depending on how this method is invoked. """ if storage not in ("s3", "local"): raise DistributeTaskException("storage argument must be s3 or local") bucket = check("S3_TEMP_BUCKET", bucket) csvdata = StringIO() outcsv = csv.writer(csvdata) outcsv.writerows( return distribute_task_csv( csvdata.getvalue().encode("utf-8-sig"), func_to_run, bucket, header=table.columns, func_kwargs=func_kwargs, func_class=func_class, func_class_kwargs=func_class_kwargs, catch=catch, group_count=group_count, storage=storage, use_s3_env_token=use_s3_env_token, )
def process_task_portion( bucket, storagekey, rangestart, rangeend, func_name, header, storage="s3", func_kwargs=None, catch=False, func_class_kwargs=None, use_s3_env_token=True, ): global FAKE_STORAGE logger.debug( f"process_task_portion func_name {func_name}, " f"storagekey {storagekey}, byterange {rangestart}-{rangeend}" ) func = import_and_get_task(func_name, func_class_kwargs) if storage == "s3": filedata = S3Storage(use_env_token=use_s3_env_token).get_range( bucket, storagekey, rangestart, rangeend ) else: filedata = FAKE_STORAGE.get_range(bucket, storagekey, rangestart, rangeend) lines = list(csv.reader(TextIOWrapper(BytesIO(filedata), encoding="utf-8-sig"))) table = Table([header] + lines) if catch: try: func(table, **func_kwargs) except Exception: # In Lambda you can search for '"Distribute Error"' in the logs type_, value_, traceback_ = sys.exc_info() err_traceback_str = "\n".join(traceback.format_exception(type_, value_, traceback_)) return { "Exception": "Distribute Error", "error": err_traceback_str, "rangestart": rangestart, "rangeend": rangeend, "func_name": func_name, "bucket": bucket, "storagekey": storagekey, } else: func(table, **func_kwargs)