Source code for parsons.aws.aws_async

import importlib
import inspect
import json
import os

import boto3


"""

In lambda handler:

from parsons.aws import event_command

def handler(event, context):

    ## ADD THESE TWO LINES TO TOP OF HANDLER:
    if event_command(event, context):
         return

"""
try:
    from zappa.asynchronous import run as zappa_run
except ImportError:
    zappa_run = None


[docs]def event_command(event, context): """ Minimal `shim <https://en.wikipedia.org/wiki/Shim_(computing)>`_ to add to the top lambda handler function to enable distributed tasks The rest of this library is compatible with zappa.async library. If you have deployed your app with `Zappa <https://github.com/Miserlou/Zappa>`_, then you do NOT need to add this shim. """ if not set(event).intersection({"task_path", "args", "kwargs"}): return False # did not match an event command func = import_and_get_task(event["task_path"], event.get("func_class_init_kwargs")) # if the func was decorated with zappa.async.task then run the real function func = getattr(func, "sync", func) # DID match an event command # -- so probably don't do the usual thing the Lambda handler does return func(*event["args"], **event["kwargs"]) or True
def run( func, args=[], kwargs={}, service="lambda", capture_response=False, remote_aws_lambda_function_name=None, remote_aws_region=None, func_class=None, func_class_init_kwargs=None, **task_kwargs, ): lambda_function_name = remote_aws_lambda_function_name or os.environ.get( "AWS_LAMBDA_FUNCTION_NAME" ) if not lambda_function_name or lambda_function_name == "FORCE_LOCAL": # We are neither running in Lambda environment, nor given one to invoke # so let's run it synchronously -- so code can be compatible both in-and-out of Lambda func(*args, **kwargs) return True # zappa has more robust and allows more configs -- but is not compatible with func_class if zappa_run and not func_class: return zappa_run( func, args, kwargs, service, capture_response, remote_aws_lambda_function_name, remote_aws_region, **task_kwargs, ) task_path = get_func_task_path(func, func_class) payload = json.dumps( { "task_path": task_path, "args": args, "kwargs": kwargs, "func_class_init_kwargs": func_class_init_kwargs, } ).encode("utf-8") if len(payload) > 128000: # pragma: no cover raise AsyncException("Payload too large for async Lambda call") lambda_client = boto3.Session().client("lambda") response = lambda_client.invoke( FunctionName=lambda_function_name, InvocationType="Event", # makes the call async Payload=payload, ) return response.get("StatusCode", 0) == 202 ## # Utility Functions ## def import_and_get_task(task_path, instance_init_kwargs=None): """ Given a modular path to a function, import that module and return the function. """ module, function = task_path.rsplit(".", 1) app_module = importlib.import_module(module) class_func = function.split("|") app_function = getattr(app_module, class_func[0]) if len(class_func) == 1: return app_function def init_and_run(*args, **kwargs): print("INITRUN", args, kwargs) if len(class_func) == 3: # instance instance = app_function # actually the class else: instance = app_function(**(instance_init_kwargs or {})) method = getattr(instance, class_func[1]) return method(*args, **kwargs) return init_and_run def get_func_task_path(func, method_class=None): """ Format the modular task path for a function via inspection. """ module_path = inspect.getmodule(method_class or func).__name__ func_name = func.__name__ # To support class methods, we need to see if it IS a method on a class # and then also determine if it is an instance method or a classmethod # Then we record that info with |'s to be decoded in import_and_get_task # classmethod format: "Foo|method|" # instance method format: "Foo|method" task_path = "{}.{}{}{}".format( module_path, f"{method_class.__name__}|" if method_class else "", func_name, "|" if method_class and "of <class" in repr(func) else "", ) return task_path class AsyncException(Exception): pass