Source code for parsons.braintree.braintree

import logging

import braintree

from parsons.etl.table import Table
from parsons.utilities.check_env import check as check_env

logger = logging.getLogger(__name__)


class ParsonsBraintreeError(Exception):
    pass


[docs]class Braintree(object): """ Braintree is a payment processor. `Args:` merchant_id: str Braintree merchant id -- probably a 16-char alphanumeric. Not required if ``BRAINTREE_MERCHANT_ID`` env variable is set. public_key: str Braintree public key -- probably a (different) 16-char alphanumeric. Not required if ``BRAINTREE_PUBLIC_KEY`` env variable is set. private_key: str Braintree private key -- probably a 32-char alphanumeric. Not required if ``BRAINTREE_PRIVATE_KEY`` env variable is set. timeout: int Optionally change the timeout from default of 200 seconds. Can also be passed with env var ``BRAINTREE_TIMEOUT``. production: bool Defaults to True. If you are testing in a Sandbox, set this to False. `Returns:` Braintree class """ query_types = { "dispute": braintree.DisputeSearch, "transaction": braintree.TransactionSearch, "subscription": braintree.SubscriptionSearch, } credit_card_fields = [ "bin", "card_type", "cardholder_name", "commercial", "country_of_issuance", "customer_location", "debit", "durbin_regulated", "expiration_month", "expiration_year", "healthcare", "image_url", "issuing_bank", "last_4", "payroll", "prepaid", "product_id", "token", "venmo_sdk", ] disbursement_fields = [ "disbursement_date", # => disbursement_date column "funds_held", "settlement_amount", "settlement_currency_exchange_rate", "settlement_currency_iso_code", "success", ] transaction_fields = [ "additional_processor_response", "amount", "avs_error_response_code", "avs_postal_code_response_code", "avs_street_address_response_code", "channel", "created_at", "currency_iso_code", "cvv_response_code", "discount_amount", "escrow_status", "gateway_rejection_reason", "id", "master_merchant_account_id", "merchant_account_id", "order_id", "payment_instrument_type", "plan_id", "processor_authorization_code", "processor_response_code", "processor_response_text", "processor_settlement_response_code", "processor_settlement_response_text", "purchase_order_number", "recurring", "refund_id", "refunded_transaction_id", "service_fee_amount", "settlement_batch_id", "shipping_amount", "ships_from_postal_code", "status", "sub_merchant_account_id", "subscription_id", "tax_amount", "tax_exempt", "type", "updated_at", "voice_referral_number", ] dispute_fields = [ "id", "amount_disputed", "amount_won", "case_number", "currency_iso_code", "kind", "merchant_account_id", "original_dispute_id", "processor_comments", "reason", "reason_code", "reason_description", "received_date", "reference_number", "reply_by_date", "status", # 'transaction.id', # DOT id -- needs to be special-cased (below) ] subscription_fields = [ "add_ons", "balance", "billing_day_of_month", "billing_period_end_date", "billing_period_start_date", "created_at", "current_billing_cycle", "days_past_due", "description", # 'descriptor', # covered under descriptor_fields "discounts", "failure_count", "first_billing_date", "id", "merchant_account_id", "never_expires", "next_bill_amount", "next_billing_date", "next_billing_period_amount", "number_of_billing_cycles", "paid_through_date", "payment_method_token", "plan_id", "price", "status", "status_history", # 'transactions', # special-cased "trial_duration", "trial_duration_unit", "trial_period", "updated_at", ] descriptor_fields = ["name", "phone", "url"] customer_fields = ["first_name", "last_name", "email"] def __init__( self, merchant_id=None, public_key=None, private_key=None, timeout=None, production=True, ): merchant_id = check_env("BRAINTREE_MERCHANT_ID", merchant_id) public_key = check_env("BRAINTREE_PUBLIC_KEY", public_key) private_key = check_env("BRAINTREE_PRIVATE_KEY", private_key) timeout = check_env("BRAINTREE_TIMEOUT", timeout, optional=True) or 200 self.gateway = braintree.BraintreeGateway( braintree.Configuration( environment=( braintree.Environment.Production if production else braintree.Environment.Sandbox ), merchant_id=merchant_id, public_key=public_key, private_key=private_key, timeout=timeout, ) )
[docs] def get_disputes( self, start_date=None, end_date=None, query_list=None, query_dict=None ): """ Get a table of disputes based on query parameters. There are three ways to pass query arguments: Pass a start_date and end_date together for a date range, or pass a query_list or query_dict argument. `Args:` start_date: date or str Start date of the dispute range. Requires `end_date` arg. e.g. '2020-11-03' end_date: date or str End date of the dispute range. Requires `start_date` arg. e.g. '2020-11-03' query_list: list of braintree.DisputeSearch You can use the `braintree.DisputeSearch <https://developers.braintreepayments.com/reference/request/dispute/search/python>`_ to create a manual list of query parameters. query_dict: jsonable-dict query_dict is basically the same as query_list, except instead of using their API objects, you can pass it in pure dictionary form. Some examples: .. highlight:: python .. code-block:: python # The start_date/end_date arguments are the same as {"effective_date": {"between": [start_date, end_date]}} # some other examples {"merchant_account_id": {"in_list": [123, 456]}} {"created_at": {"greater_than_or_equal": "2020-03-10"}} `Returns:` Table Class """ collection = self._get_collection( "dispute", query_list=query_list, query_dict=query_dict, default_query=( {"effective_date": dict(between=[start_date, end_date])} if start_date and end_date else None ), ) # Iterating on collection.items triggers web requests in batches of 50 records # Disputes query api doesn't return the ids -- we can't do anything but iterate if not collection.is_success: raise ParsonsBraintreeError( f"Braintree dispute query failed: {collection.message}" ) return Table( [self._dispute_header()] + [self._dispute_to_row(r) for r in collection.disputes.items] )
[docs] def get_subscriptions( self, table_of_ids=None, start_date=None, end_date=None, query_list=None, query_dict=None, include_transactions=False, just_ids=False, ): """ Get a table of subscriptions based on query parameters. There are three ways to pass query arguments: Pass a disbursement_start_date and disbursement_end_date together for a date range, or pass a query_list or query_dict argument. `Args:` start_date: date or str Start date of the subscription range. Requires `end_date` arg. e.g. '2020-11-03' end_date: date or str End date of the subscription range. Requires `start_date` arg. e.g. '2020-11-03' query_list: list of braintree.SubscriptionSearch You can use the `braintree.SubscriptionSearch <https://developers.braintreepayments.com/reference/request/subscription/search/python>`_ to create a manual list of query parameters. query_dict: jsonable-dict query_dict is basically the same as query_list, except instead of using their API objects, you can pass it in pure dictionary form. Some examples: .. highlight:: python .. code-block:: python # The start_date/end_date arguments are the same as {"created_at": {"between": [start_date, end_date]}} # some other examples {"merchant_account_id": {"in_list": [123, 456]}} {"created_at": {"greater_than_or_equal": "2020-03-10"}} include_transactions: bool If this is true, include the full collection of transaction objects. Otherwise, just return a list of transaction IDs. just_ids: bool While querying a list of subscription ids is a single, fast query to Braintree's API, getting all data for each subscription is force-paginated at 50-records per request. If you just need a count or the list of ids, then set `just_ids=True` and it will return a single column with `id` instead of all table columns. table_of_ids: Table with an `id` column -- i.e. a table returned from `just_ids=True` Subsequently, after calling this with `just_ids`, you can prune/alter the ids table and then pass the table back to get the full data. These are somewhat-niche use-cases, but occasionally crucial when a search result returns 1000s of ids. `Returns:` Table Class """ collection = self._get_collection( "subscription", table_of_ids=table_of_ids, query_list=query_list, query_dict=query_dict, default_query=( {"created_at": dict(between=[start_date, end_date])} if start_date and end_date else None ), ) query_count = len(collection.ids) logger.info( f"Braintree subscriptions search resulted in subscriptions count of {query_count}" ) if just_ids: return Table([("id",)] + [[item_id] for item_id in collection.ids]) # Iterating on collection.items triggers web requests in batches of 50 records # This can be frustratingly slow :-( # Also note: Braintree will push you to their new GraphQL API, # but it, too, paginates with a max of 50 records logger.debug("Braintree subscriptions iterating to build subscriptions table") return Table( [self._subscription_header(include_transactions)] + [ self._subscription_to_row(include_transactions, r) for r in collection.items ] )
[docs] def get_transactions( self, table_of_ids=None, disbursement_start_date=None, disbursement_end_date=None, query_list=None, query_dict=None, just_ids=False, ): """ Get a table of transactions based on query parameters. There are three ways to pass query arguments: Pass a disbursement_start_date and disbursement_end_date together for a date range, or pass a query_list or query_dict argument. `Args:` disbursement_start_date: date or str Start date of the disbursement range. Requires `disbursement_end_date` arg. e.g. '2020-11-03' disbursement_end_date: date or str End date of the disbursement range. Requires `disbursement_start_date` arg. e.g. '2020-11-03' query_list: list of braintree.TransactionSearch You can use the `braintree.TransactionSearch <https://developers.braintreepayments.com/reference/request/transaction/search/python>`_ to create a manual list of query parameters. query_dict: jsonable-dict query_dict is basically the same as query_list, except instead of using their API objects, you can pass it in pure dictionary form. Some examples: .. highlight:: python .. code-block:: python # The disbursement_start_date/disbursement_end_date arguments are the same as {"disbursement_date": {"between": [start_date, end_date]}} # some other examples {"merchant_account_id": {"in_list": [123, 456]}} {"created_at": {"greater_than_or_equal": "2020-03-10"}} just_ids: bool While querying a list of transaction ids is a single, fast query to Braintree's API, getting all data for each transaction is force-paginated at 50-records per request. If you just need a count or the list of ids, then set `just_ids=True` and it will return a single column with `id` instead of all table columns. table_of_ids: Table with an `id` column -- i.e. a table returned from `just_ids=True` Subsequently, after calling this with `just_ids`, you can prune/alter the ids table and then pass the table back to get the full data. These are somewhat-niche use-cases, but occasionally crucial when a search result returns 1000s of ids. `Returns:` Table Class """ collection = self._get_collection( "transaction", table_of_ids=table_of_ids, query_list=query_list, query_dict=query_dict, default_query=( { "disbursement_date": dict( between=[disbursement_start_date, disbursement_end_date] ) } if disbursement_start_date and disbursement_end_date else None ), ) query_count = len(collection.ids) logger.info( f"Braintree transactions resulted in transaction count of {query_count}" ) if just_ids: return Table([("id",)] + [[item_id] for item_id in collection.ids]) # Iterating on collection.items triggers web requests in batches of 50 records # This can be frustratingly slow :-( # Also note: Braintree will push you to their new GraphQL API, # but it, too, paginates with a max of 50 records logger.debug("Braintree transactions iterating to build transaction table") return Table( [self._transaction_header()] + [self._transaction_to_row(r) for r in collection.items] )
def _dispute_header(self): return self.dispute_fields + ["transaction_id"] def _dispute_to_row(self, collection_item): row = [getattr(collection_item, k) for k in self.dispute_fields] # the single sub-attribute row.append(collection_item.transaction.id) return row def _transaction_header(self): return ( [f"credit_card_{k}" for k in self.credit_card_fields] # annoying exception in column name + [ (f"disbursement_{k}" if k != "disbursement_date" else k) for k in self.disbursement_fields ] + [f"customer_{k}" for k in self.customer_fields] + self.transaction_fields ) def _transaction_to_row(self, collection_item): return ( [ ( collection_item.credit_card.get(k) if getattr(collection_item, "credit_card", None) else None ) for k in self.credit_card_fields ] + [ getattr(collection_item.disbursement_details, k) for k in self.disbursement_fields ] + [ getattr(collection_item.customer_details, k) for k in self.customer_fields ] + [getattr(collection_item, k) for k in self.transaction_fields] ) def _subscription_header(self, include_transactions): if include_transactions: return ( [f"descriptor_{k}" for k in self.descriptor_fields] + self.subscription_fields + ["transactions"] ) else: return ( [f"descriptor_{k}" for k in self.descriptor_fields] + self.subscription_fields + ["transaction_ids"] ) def _subscription_to_row(self, include_transactions, collection_item): if include_transactions: return ( [getattr(collection_item.descriptor, k) for k in self.descriptor_fields] + [getattr(collection_item, k) for k in self.subscription_fields] + [collection_item.transactions] ) else: return ( [getattr(collection_item.descriptor, k) for k in self.descriptor_fields] + [getattr(collection_item, k) for k in self.subscription_fields] + [";".join(t.id for t in collection_item.transactions)] ) def _get_collection( self, query_type, table_of_ids=None, query_list=None, query_dict=None, default_query=None, ): collection_query = None collection = None if query_list: collection_query = query_list elif query_dict: collection_query = self._get_query_objects(query_type, **query_dict) elif default_query: collection_query = self._get_query_objects(query_type, **default_query) if not collection_query: raise ParsonsBraintreeError( "You must pass some query parameters: " "query_dict, start_date with end_date, or query_list" ) if table_of_ids: # We don't need to re-do the query, we can just reconstruct the query object collection = self._create_collection( query_type, table_of_ids.table.values("id"), collection_query ) else: collection = getattr(self.gateway, query_type).search(*collection_query) return collection def _get_query_objects(self, query_type, **queryparams): """ Examples: disbursement_date={'between': ['2020-03-20', '2020-03-27']} merchant_account_id={'in_list': [123, 456]} created_at={'greater_than_or_equal': '2020-03-10'} """ queries = [] for node, filters in queryparams.items(): # Very meta programming here, abstracting braintree library's (dumb?) attribute style # Example: braintree.DisputeSearch.effective_date # Example: braintree.TransactionSearch.disbursement_date queryobj = getattr(self.query_types[query_type], node, None) if queryobj: for qual, vals in filters.items(): # likely only one, but fine queryobj_qualfunc = getattr(queryobj, qual, None) if not queryobj_qualfunc: raise ParsonsBraintreeError( "oh no, that's not a braintree parameter" ) if not isinstance(vals, list): vals = [vals] queries.append(queryobj_qualfunc(*vals)) else: raise ParsonsBraintreeError("oh no, that's not a braintree parameter") return queries def _create_collection(self, query_type, ids, queries): if (query_type == "transaction") or (query_type == "disbursement"): gateway = braintree.TransactionGateway(self.gateway) return braintree.ResourceCollection( queries, {"search_results": {"ids": list(ids), "page_size": 50}}, method=gateway._TransactionGateway__fetch, ) if query_type == "subscription": gateway = braintree.SubscriptionGateway(self.gateway) return braintree.ResourceCollection( queries, {"search_results": {"ids": list(ids), "page_size": 50}}, method=gateway._SubscriptionGateway__fetch, )