Source code for parsons.copper.copper

from requests import request
import math
import json
import time
from parsons.etl import Table
from parsons.utilities import check_env
import logging

logger = logging.getLogger(__name__)

COPPER_URI = "https://api.prosperworks.com/developer_api/v1"


[docs]class Copper(object): """ Instantiate Copper Class `Args:` user_email: The email of the API user for Copper. Not required if ``COPPER_USER_EMAIL`` env variable set. api_key: The Copper provided application key. Not required if ``COPPER_API_KEY`` env. variable set. `Returns:` Copper Class """ def __init__(self, user_email=None, api_key=None): self.api_key = check_env.check('COPPER_API_KEY', api_key) self.user_email = check_env.check('COPPER_USER_EMAIL', user_email) self.uri = COPPER_URI def base_request(self, endpoint, req_type, page=1, page_size=200, filters=None): # Internal Request Method url = self.uri + endpoint # Authentication must be done through headers, requests HTTPBasicAuth doesn't work headers = { 'X-PW-AccessToken': self.api_key, 'X-PW-Application': "developer_api", 'X-PW-UserEmail': self.user_email, 'Content-Type': "application/json" } payload = {} if filters is not None: if len(filters) > 0 and isinstance(filters, dict): payload.update(filters) # GET request with non-None data arg is malformed if req_type == 'GET': return request(req_type, url, params=json.dumps(payload), headers=headers) else: payload["page_number"] = page payload["page_size"] = page_size return request(req_type, url, data=json.dumps(payload), headers=headers) def paginate_request(self, endpoint, req_type, page_size=200, filters=None): # Internal pagination method page = 1 total_pages = 2 blob = [] only_page = False if isinstance(filters, dict): # Assume user wants just that page if page_number specified in filters if 'page_number' in filters: page = filters['page_number'] # Ensure exactly one loop total_pages = page rows = f'{str(page_size)} or less' only_page = True else: filters = {} while page <= total_pages: r = self.base_request( endpoint, req_type, page_size=page_size, page=page, filters=filters ) if page == 1: if 'X-Pw-Total' in r.headers and not only_page: rows = r.headers['X-Pw-Total'] total_pages = int(math.ceil(int(rows)/float(page_size))) else: rows = f'{str(page_size)} or less' total_pages = 1 logger.info(f"Retrieving page {page} of {total_pages}, total rows: {rows}") page += 1 if r.text == "": return [] # Avoid too many layers of nesting if possible if isinstance(json.loads(r.text), list): blob.extend(json.loads(r.text)) else: blob.append(json.loads(r.text)) # Wait for 1 second to avoid hitting rate limits time.sleep(1) return blob
[docs] def get_people(self, filters=None, tidy=False): """ Get people `Args:` `filters: dict` Optional; pass additional parameters to filter the records returned. See `Copper documentation <https://developer.copper.com/?version=latest#9c15869b-c894-4fa2-9346-d65a6602c129>`_ for choices `tidy: boolean or int` Optional; unpack list and dict columns as additional rows instead of columns If `True`: creates new table out of unpacked rows If 'int': adds rows to original table if max rows per key <= given number (so `tidy=0` guarantees new table) `Returns:` List of dicts of Parsons Tables: * people * people_emails * people_phone_numbers * people_custom_fields * people_socials * people_websites """ # noqa: E501,E261 return self.get_standard_object("people", filters=filters, tidy=tidy)
[docs] def get_companies(self, filters=None, tidy=False): """ Get companies `Args:` `filters: dict` Optional; pass additional parameters to filter the records returned. See `Copper documentation <https://developer.copper.com/?version=latest#0b4f267f-3180-4041-861c-13f3cf17bcf9>`_ for choices `tidy: boolean or int` Optional; unpack list and dict columns as additional rows instead of columns If `True`: creates new table out of unpacked rows If 'int': adds rows to original table if max rows per key <= given number (so `tidy=0` guarantees new table) `Returns:` List of dicts of Parsons Tables: * companies * companies_phone_numbers * companies_custom_fields * companies_socials * companies_websites """ # noqa: E501,E261 return self.get_standard_object("companies", filters=filters, tidy=tidy)
[docs] def get_activities(self, filters=None, tidy=False): """ Get activities `Args:` `filters: dict` Optional; pass additional parameters to filter the records returned. See `Copper documentation <https://developer.copper.com/?version=latest#d2e6ddd8-6699-4ff3-87e3-1febb0410dc9>`_ for choices Optional; unpack list and dict columns as additional rows instead of columns If `True`: creates new table out of unpacked rows If 'int': adds rows to original table if max rows per key <= given number (so `tidy=0` guarantees new table) `Returns:` List of dicts of Parsons Tables: * activities """ # noqa: E501,E261 return self.get_standard_object("activities", filters=filters, tidy=tidy)
[docs] def get_opportunities(self, filters=None, tidy=False): """ Get opportunities (i.e. donations) `Args:` `filters: dict` Optional; pass additional parameters to filter the records returned. See `Copper documentation <https://developer.copper.com/?version=latest#5bb8adc1-137f-46bf-aa86-7df037840e57>`_ for choices Optional; unpack list and dict columns as additional rows instead of columns If `True`: creates new table out of unpacked rows If 'int': adds rows to original table if max rows per key <= given number (so `tidy=0` guarantees new table) `Returns:` List of dicts of Parsons Tables: * opportunities * opportunities_custom_fields """ # noqa: E501,E261 return self.get_standard_object("opportunities", filters=filters, tidy=tidy)
def get_standard_object(self, object_name, filters=None, tidy=False): # Retrieve and process a standard endpoint object (e.g. people, companies, etc.) logger.info(f"Retrieving {object_name} records.") blob = self.paginate_request(f"/{object_name}/search", req_type='POST', filters=filters) return self.process_json(blob, object_name, tidy=tidy)
[docs] def get_custom_fields(self): """ Get custom fields `Args:` `filters: dict` Optional; pass additional parameters to filter the records returned. See `Copper documentation <https://developer.copper.com/?version=latest#bf389290-0c19-46a7-85bf-f5e6884fa4e1>`_ for choices `Returns:` List of dicts of Parsons Tables: * custom_fields * custom_fields_available * custom_fields_options """ # noqa: E501,E261 logger.info("Retrieving custom fields.") blob = self.paginate_request('/custom_field_definitions/', req_type='GET') return self.process_custom_fields(blob)
[docs] def get_activity_types(self): """ Get activity types `Args:` `filters: dict` Optional; pass additional parameters to filter the records returned. See `Copper documentation <https://developer.copper.com/?version=latest#6bd339f1-f0de-48b4-8c34-5a5e245e036f>`_ for choices `Returns:` List of dicts of Parsons Tables: * activitiy_types """ # noqa: E501,E261 logger.info("Retrieving activity types.") response = self.paginate_request('/activity_types/', req_type='GET') orig_table = Table(response) at_user = orig_table.long_table([], 'user', prepend=False) at_sys = orig_table.long_table([], 'system', prepend=False) Table.concat(at_sys, at_user) return [{'name': 'activity_types', 'tbl': at_sys}]
[docs] def get_contact_types(self): """ Get contact types `Args:` `filters: dict` Optional; pass additional parameters to filter the records returned. See `Copper documentation <https://developer.copper.com/?version=latest#8b6e6ed8-c594-4eed-a2af-586aa2100f09>`_ for choices `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ # noqa: E501,E261 response = self.paginate_request('/contact_types/', req_type='GET') return Table(response)
def process_json(self, json_blob, obj_type, tidy=False): # Internal method for converting most types of json responses into a list of Parsons tables # Output goes here table_list = [] # Original table & columns obj_table = Table(json_blob) cols = obj_table.get_columns_type_stats() list_cols = [x['name'] for x in cols if 'list' in x['type']] dict_cols = [x['name'] for x in cols if 'dict' in x['type']] # Unpack all list columns if len(list_cols) > 0: for l in list_cols: # Check for nested data list_rows = obj_table.select_rows( lambda row: isinstance(row[l], list) and any(isinstance(x, dict) for x in row[l]) ) # Add separate long table for each column with nested data if list_rows.num_rows > 0: logger.debug(l, 'is a nested column') if len([x for x in cols if x['name'] == l]) == 1: table_list.append({ 'name': f'{obj_type}_{l}', 'tbl': obj_table.long_table(['id'], l) }) else: # Ignore if column doesn't exist (or has multiples) continue else: if tidy is False: logger.debug(l, 'is a normal list column') obj_table.unpack_list(l) # Unpack all dict columns if len(dict_cols) > 0 and tidy is False: for d in dict_cols: logger.debug(d, 'is a dict column') obj_table.unpack_dict(d) if tidy is not False: packed_cols = list_cols + dict_cols for p in packed_cols: if p in obj_table.columns: logger.debug(p, 'needs to be unpacked into rows') # Determine whether or not to expand based on tidy unpacked_tidy = obj_table.unpack_nested_columns_as_rows(p, expand_original=tidy) # Check if column was removed as sign it was unpacked into separate table if p not in obj_table.columns: table_list.append({ 'name': f'{obj_type}_{p}', 'tbl': unpacked_tidy }) else: obj_table = unpacked_tidy # Original table will have had all nested columns removed if len(obj_table.columns) > 1: table_list.append({'name': obj_type, 'tbl': obj_table}) return table_list def process_custom_fields(self, json_blob): # Internal method to convert custom fields responses into a list of Parsons tables # Original table & columns custom_fields = Table(json_blob) # Available On available_on = custom_fields.long_table(['id'], 'available_on') # Options options = custom_fields.long_table(['id', 'name'], 'options') return [{'name': 'custom_fields', 'tbl': custom_fields}, {'name': 'custom_fields_available', 'tbl': available_on}, {'name': 'custom_fields_options', 'tbl': options}]