import logging
import braintree
from parsons.etl.table import Table
from parsons.utilities.check_env import check as check_env
logger = logging.getLogger(__name__)
class ParsonsBraintreeError(Exception):
class Braintree(object):
Braintree is a payment processor.
merchant_id: str
Braintree merchant id -- probably a 16-char alphanumeric.
Not required if ``BRAINTREE_MERCHANT_ID`` env variable is set.
public_key: str
Braintree public key -- probably a (different) 16-char alphanumeric.
Not required if ``BRAINTREE_PUBLIC_KEY`` env variable is set.
private_key: str
Braintree private key -- probably a 32-char alphanumeric.
Not required if ``BRAINTREE_PRIVATE_KEY`` env variable is set.
timeout: int
Optionally change the timeout from default of 200 seconds.
Can also be passed with env var ``BRAINTREE_TIMEOUT``.
production: bool
Defaults to True. If you are testing in a Sandbox,
set this to False.
Braintree class
query_types = {
"dispute": braintree.DisputeSearch,
"transaction": braintree.TransactionSearch,
"subscription": braintree.SubscriptionSearch,
credit_card_fields = [
disbursement_fields = [
"disbursement_date", # => disbursement_date column
transaction_fields = [
dispute_fields = [
# '', # DOT id -- needs to be special-cased (below)
subscription_fields = [
# 'descriptor', # covered under descriptor_fields
# 'transactions', # special-cased
descriptor_fields = ["name", "phone", "url"]
customer_fields = ["first_name", "last_name", "email"]
def __init__(
merchant_id = check_env("BRAINTREE_MERCHANT_ID", merchant_id)
public_key = check_env("BRAINTREE_PUBLIC_KEY", public_key)
private_key = check_env("BRAINTREE_PRIVATE_KEY", private_key)
timeout = check_env("BRAINTREE_TIMEOUT", timeout, optional=True) or 200
self.gateway = braintree.BraintreeGateway(
if production
else braintree.Environment.Sandbox
def get_disputes(self, start_date=None, end_date=None, query_list=None, query_dict=None):
Get a table of disputes based on query parameters.
There are three ways to pass query arguments: Pass a start_date and end_date
together for a date range, or pass a query_list or query_dict argument.
start_date: date or str
Start date of the dispute range. Requires `end_date` arg. e.g. '2020-11-03'
end_date: date or str
End date of the dispute range. Requires `start_date` arg. e.g. '2020-11-03'
query_list: list of braintree.DisputeSearch
You can use the `braintree.DisputeSearch
to create a manual list of query parameters.
query_dict: jsonable-dict
query_dict is basically the same as query_list, except instead of using their API
objects, you can pass it in pure dictionary form.
Some examples:
.. highlight:: python
.. code-block:: python
# The start_date/end_date arguments are the same as
{"effective_date": {"between": [start_date, end_date]}}
# some other examples
{"merchant_account_id": {"in_list": [123, 456]}}
{"created_at": {"greater_than_or_equal": "2020-03-10"}}
Table Class
collection = self._get_collection(
{"effective_date": dict(between=[start_date, end_date])}
if start_date and end_date
else None
# Iterating on collection.items triggers web requests in batches of 50 records
# Disputes query api doesn't return the ids -- we can't do anything but iterate
if not collection.is_success:
raise ParsonsBraintreeError(f"Braintree dispute query failed: {collection.message}")
return Table(
[self._dispute_header()] + [self._dispute_to_row(r) for r in collection.disputes.items]
def get_subscriptions(
Get a table of subscriptions based on query parameters.
There are three ways to pass query arguments:
Pass a disbursement_start_date and disbursement_end_date together
for a date range, or pass a query_list or query_dict argument.
start_date: date or str
Start date of the subscription range. Requires `end_date` arg.
e.g. '2020-11-03'
end_date: date or str
End date of the subscription range. Requires `start_date` arg.
e.g. '2020-11-03'
query_list: list of braintree.SubscriptionSearch
You can use the `braintree.SubscriptionSearch
to create a manual list of query parameters.
query_dict: jsonable-dict
query_dict is basically the same as query_list, except instead of using their API
objects, you can pass it in pure dictionary form.
Some examples:
.. highlight:: python
.. code-block:: python
# The start_date/end_date arguments are the same as
{"created_at": {"between": [start_date, end_date]}}
# some other examples
{"merchant_account_id": {"in_list": [123, 456]}}
{"created_at": {"greater_than_or_equal": "2020-03-10"}}
include_transactions: bool
If this is true, include the full collection of transaction objects.
Otherwise, just return a list of transaction IDs.
just_ids: bool
While querying a list of subscription ids is a single, fast query to Braintree's
API, getting all data for each subscription is force-paginated at 50-records per
request. If you just need a count or the list of ids, then set `just_ids=True` and
it will return a single column with `id` instead of all table columns.
table_of_ids: Table with an `id` column -- i.e. a table returned from `just_ids=True`
Subsequently, after calling this with `just_ids`, you can prune/alter the ids table
and then pass the table back to get the full data.
These are somewhat-niche use-cases, but occasionally crucial
when a search result returns 1000s of ids.
Table Class
collection = self._get_collection(
{"created_at": dict(between=[start_date, end_date])}
if start_date and end_date
else None
query_count = len(collection.ids)
f"Braintree subscriptions search resulted in subscriptions count of {query_count}"
if just_ids:
return Table([("id",)] + [[item_id] for item_id in collection.ids])
# Iterating on collection.items triggers web requests in batches of 50 records
# This can be frustratingly slow :-(
# Also note: Braintree will push you to their new GraphQL API,
# but it, too, paginates with a max of 50 records
logger.debug("Braintree subscriptions iterating to build subscriptions table")
return Table(
+ [self._subscription_to_row(include_transactions, r) for r in collection.items]
def get_transactions(
Get a table of transactions based on query parameters.
There are three ways to pass query arguments:
Pass a disbursement_start_date and disbursement_end_date together
for a date range, or pass a query_list or query_dict argument.
disbursement_start_date: date or str
Start date of the disbursement range. Requires `disbursement_end_date` arg.
e.g. '2020-11-03'
disbursement_end_date: date or str
End date of the disbursement range. Requires `disbursement_start_date` arg.
e.g. '2020-11-03'
query_list: list of braintree.TransactionSearch
You can use the `braintree.TransactionSearch
to create a manual list of query parameters.
query_dict: jsonable-dict
query_dict is basically the same as query_list, except instead of using their API
objects, you can pass it in pure dictionary form.
Some examples:
.. highlight:: python
.. code-block:: python
# The disbursement_start_date/disbursement_end_date arguments are the same as
{"disbursement_date": {"between": [start_date, end_date]}}
# some other examples
{"merchant_account_id": {"in_list": [123, 456]}}
{"created_at": {"greater_than_or_equal": "2020-03-10"}}
just_ids: bool
While querying a list of transaction ids is a single, fast query to Braintree's API,
getting all data for each transaction is force-paginated at 50-records per request.
If you just need a count or the list of ids, then set `just_ids=True` and
it will return a single column with `id` instead of all table columns.
table_of_ids: Table with an `id` column -- i.e. a table returned from `just_ids=True`
Subsequently, after calling this with `just_ids`, you can prune/alter the ids table
and then pass the table back to get the full data.
These are somewhat-niche use-cases, but occasionally crucial
when a search result returns 1000s of ids.
Table Class
collection = self._get_collection(
"disbursement_date": dict(
between=[disbursement_start_date, disbursement_end_date]
if disbursement_start_date and disbursement_end_date
else None
query_count = len(collection.ids)"Braintree transactions resulted in transaction count of {query_count}")
if just_ids:
return Table([("id",)] + [[item_id] for item_id in collection.ids])
# Iterating on collection.items triggers web requests in batches of 50 records
# This can be frustratingly slow :-(
# Also note: Braintree will push you to their new GraphQL API,
# but it, too, paginates with a max of 50 records
logger.debug("Braintree transactions iterating to build transaction table")
return Table(
[self._transaction_header()] + [self._transaction_to_row(r) for r in collection.items]
def _dispute_header(self):
return self.dispute_fields + ["transaction_id"]
def _dispute_to_row(self, collection_item):
row = [getattr(collection_item, k) for k in self.dispute_fields]
# the single sub-attribute
return row
def _transaction_header(self):
return (
[f"credit_card_{k}" for k in self.credit_card_fields]
# annoying exception in column name
+ [
(f"disbursement_{k}" if k != "disbursement_date" else k)
for k in self.disbursement_fields
+ [f"customer_{k}" for k in self.customer_fields]
+ self.transaction_fields
def _transaction_to_row(self, collection_item):
return (
if getattr(collection_item, "credit_card", None)
else None
for k in self.credit_card_fields
+ [getattr(collection_item.disbursement_details, k) for k in self.disbursement_fields]
+ [getattr(collection_item.customer_details, k) for k in self.customer_fields]
+ [getattr(collection_item, k) for k in self.transaction_fields]
def _subscription_header(self, include_transactions):
if include_transactions:
return (
[f"descriptor_{k}" for k in self.descriptor_fields]
+ self.subscription_fields
+ ["transactions"]
return (
[f"descriptor_{k}" for k in self.descriptor_fields]
+ self.subscription_fields
+ ["transaction_ids"]
def _subscription_to_row(self, include_transactions, collection_item):
if include_transactions:
return (
[getattr(collection_item.descriptor, k) for k in self.descriptor_fields]
+ [getattr(collection_item, k) for k in self.subscription_fields]
+ [collection_item.transactions]
return (
[getattr(collection_item.descriptor, k) for k in self.descriptor_fields]
+ [getattr(collection_item, k) for k in self.subscription_fields]
+ [";".join( for t in collection_item.transactions)]
def _get_collection(
collection_query = None
collection = None
if query_list:
collection_query = query_list
elif query_dict:
collection_query = self._get_query_objects(query_type, **query_dict)
elif default_query:
collection_query = self._get_query_objects(query_type, **default_query)
if not collection_query:
raise ParsonsBraintreeError(
"You must pass some query parameters: "
"query_dict, start_date with end_date, or query_list"
if table_of_ids:
# We don't need to re-do the query, we can just reconstruct the query object
collection = self._create_collection(
query_type, table_of_ids.table.values("id"), collection_query
collection = getattr(self.gateway, query_type).search(*collection_query)
return collection
def _get_query_objects(self, query_type, **queryparams):
disbursement_date={'between': ['2020-03-20', '2020-03-27']}
merchant_account_id={'in_list': [123, 456]}
created_at={'greater_than_or_equal': '2020-03-10'}
queries = []
for node, filters in queryparams.items():
# Very meta programming here, abstracting braintree library's (dumb?) attribute style
# Example: braintree.DisputeSearch.effective_date
# Example: braintree.TransactionSearch.disbursement_date
queryobj = getattr(self.query_types[query_type], node, None)
if queryobj:
for qual, vals in filters.items(): # likely only one, but fine
queryobj_qualfunc = getattr(queryobj, qual, None)
if not queryobj_qualfunc:
raise ParsonsBraintreeError("oh no, that's not a braintree parameter")
if not isinstance(vals, list):
vals = [vals]
raise ParsonsBraintreeError("oh no, that's not a braintree parameter")
return queries
def _create_collection(self, query_type, ids, queries):
if (query_type == "transaction") or (query_type == "disbursement"):
gateway = braintree.TransactionGateway(self.gateway)
return braintree.ResourceCollection(
{"search_results": {"ids": list(ids), "page_size": 50}},
if query_type == "subscription":
gateway = braintree.SubscriptionGateway(self.gateway)
return braintree.ResourceCollection(
{"search_results": {"ids": list(ids), "page_size": 50}},