import importlib
import inspect
import json
import os
import boto3
"""
In lambda handler:
from parsons.aws import event_command
def handler(event, context):
## ADD THESE TWO LINES TO TOP OF HANDLER:
if event_command(event, context):
return
"""
try:
from zappa.asynchronous import run as zappa_run
except ImportError:
zappa_run = None
[docs]def event_command(event, context):
"""
Minimal `shim <https://en.wikipedia.org/wiki/Shim_(computing)>`_
to add to the top lambda handler function to enable distributed tasks
The rest of this library is compatible with zappa.async library.
If you have deployed your app with `Zappa <https://github.com/Miserlou/Zappa>`_,
then you do NOT need to add this shim.
"""
if not set(event).intersection({'task_path', 'args', 'kwargs'}):
return False # did not match an event command
func = import_and_get_task(event['task_path'], event.get('func_class_init_kwargs'))
# if the func was decorated with zappa.async.task then run the real function
func = getattr(func, 'sync', func)
# DID match an event command
# -- so probably don't do the usual thing the Lambda handler does
return (func(*event['args'], **event['kwargs'])
or True)
def run(func, args=[], kwargs={}, service='lambda', capture_response=False,
remote_aws_lambda_function_name=None, remote_aws_region=None,
func_class=None, func_class_init_kwargs=None, **task_kwargs):
lambda_function_name = (remote_aws_lambda_function_name
or os.environ.get('AWS_LAMBDA_FUNCTION_NAME'))
if not lambda_function_name or lambda_function_name == 'FORCE_LOCAL':
# We are neither running in Lambda environment, nor given one to invoke
# so let's run it synchronously -- so code can be compatible both in-and-out of Lambda
func(*args, **kwargs)
return True
# zappa has more robust and allows more configs -- but is not compatible with func_class
if zappa_run and not func_class:
return zappa_run(func, args, kwargs, service,
capture_response, remote_aws_lambda_function_name, remote_aws_region,
**task_kwargs)
task_path = get_func_task_path(func, func_class)
payload = (json.dumps({'task_path': task_path,
'args': args,
'kwargs': kwargs,
'func_class_init_kwargs': func_class_init_kwargs})
.encode('utf-8'))
if len(payload) > 128000: # pragma: no cover
raise AsyncException("Payload too large for async Lambda call")
lambda_client = boto3.Session().client('lambda')
response = lambda_client.invoke(
FunctionName=lambda_function_name,
InvocationType='Event', # makes the call async
Payload=payload
)
return response.get('StatusCode', 0) == 202
##
# Utility Functions
##
def import_and_get_task(task_path, instance_init_kwargs=None):
"""
Given a modular path to a function, import that module
and return the function.
"""
module, function = task_path.rsplit('.', 1)
app_module = importlib.import_module(module)
class_func = function.split('|')
app_function = getattr(app_module, class_func[0])
if len(class_func) == 1:
return app_function
def init_and_run(*args, **kwargs):
print('INITRUN', args, kwargs)
if len(class_func) == 3: # instance
instance = app_function # actually the class
else:
instance = app_function(**(instance_init_kwargs or {}))
method = getattr(instance, class_func[1])
return method(*args, **kwargs)
return init_and_run
def get_func_task_path(func, method_class=None):
"""
Format the modular task path for a function via inspection.
"""
module_path = inspect.getmodule(method_class or func).__name__
func_name = func.__name__
# To support class methods, we need to see if it IS a method on a class
# and then also determine if it is an instance method or a classmethod
# Then we record that info with |'s to be decoded in import_and_get_task
# classmethod format: "Foo|method|"
# instance method format: "Foo|method"
task_path = '{}.{}{}{}'.format(
module_path,
f'{method_class.__name__}|' if method_class else '',
func_name,
'|' if method_class and 'of <class' in repr(func) else ''
)
return task_path
class AsyncException(Exception):
pass