Source code for parsons.aws.aws_async

import importlib
import inspect
import json
import os

import boto3


"""

In lambda handler:

from parsons.aws import event_command

def handler(event, context):

    ## ADD THESE TWO LINES TO TOP OF HANDLER:
    if event_command(event, context):
         return

"""
try:
    from zappa.asynchronous import run as zappa_run
except ImportError:
    zappa_run = None


[docs]def event_command(event, context): """ Minimal shim to add to the top lambda handler function to enable distributed tasks In your lambda handler: .. code-block:: python :emphasize-lines: 5,6 from parsons.aws import event_command def handler(event, context): ## ADD THESE TWO LINES TO TOP OF HANDLER: if event_command(event, context): return The rest of this library is compatible with zappa.async library. If you have deployed your app with `Zappa <https://github.com/Miserlou/Zappa>`, then you do NOT need to add this shim. """ if not set(event).intersection({'task_path', 'args', 'kwargs'}): return False # did not match an event command func = import_and_get_task(event['task_path'], event.get('func_class_init_kwargs')) # if the func was decorated with zappa.async.task then run the real function func = getattr(func, 'sync', func) # DID match an event command # -- so probably don't do the usual thing the Lambda handler does return (func(*event['args'], **event['kwargs']) or True)
def run(func, args=[], kwargs={}, service='lambda', capture_response=False, remote_aws_lambda_function_name=None, remote_aws_region=None, func_class=None, func_class_init_kwargs=None, **task_kwargs): lambda_function_name = (remote_aws_lambda_function_name or os.environ.get('AWS_LAMBDA_FUNCTION_NAME')) if not lambda_function_name or lambda_function_name == 'FORCE_LOCAL': # We are neither running in Lambda environment, nor given one to invoke # so let's run it synchronously -- so code can be compatible both in-and-out of Lambda func(*args, **kwargs) return True # zappa has more robust and allows more configs -- but is not compatible with func_class if zappa_run and not func_class: return zappa_run(func, args, kwargs, service, capture_response, remote_aws_lambda_function_name, remote_aws_region, **task_kwargs) task_path = get_func_task_path(func, func_class) payload = (json.dumps({'task_path': task_path, 'args': args, 'kwargs': kwargs, 'func_class_init_kwargs': func_class_init_kwargs}) .encode('utf-8')) if len(payload) > 128000: # pragma: no cover raise AsyncException("Payload too large for async Lambda call") lambda_client = boto3.Session().client('lambda') response = lambda_client.invoke( FunctionName=lambda_function_name, InvocationType='Event', # makes the call async Payload=payload ) return response.get('StatusCode', 0) == 202 ## # Utility Functions ## def import_and_get_task(task_path, instance_init_kwargs=None): """ Given a modular path to a function, import that module and return the function. """ module, function = task_path.rsplit('.', 1) app_module = importlib.import_module(module) class_func = function.split('|') app_function = getattr(app_module, class_func[0]) if len(class_func) == 1: return app_function def init_and_run(*args, **kwargs): print('INITRUN', args, kwargs) if len(class_func) == 3: # instance instance = app_function # actually the class else: instance = app_function(**(instance_init_kwargs or {})) method = getattr(instance, class_func[1]) return method(*args, **kwargs) return init_and_run def get_func_task_path(func, method_class=None): """ Format the modular task path for a function via inspection. """ module_path = inspect.getmodule(method_class or func).__name__ func_name = func.__name__ # To support class methods, we need to see if it IS a method on a class # and then also determine if it is an instance method or a classmethod # Then we record that info with |'s to be decoded in import_and_get_task # classmethod format: "Foo|method|" # instance method format: "Foo|method" task_path = '{}.{}{}{}'.format( module_path, f'{method_class.__name__}|' if method_class else '', func_name, '|' if method_class and 'of <class' in repr(func) else '' ) return task_path class AsyncException(Exception): pass