Source code for parsons.ngpvan.bulk_import

"""NGPVAN Bulk Import Endpoints"""

from parsons.etl.table import Table
from parsons.utilities import cloud_storage

import logging
import uuid
import csv

logger = logging.getLogger(__name__)


[docs] class BulkImport(object): def __init__(self): pass
[docs] def get_bulk_import_resources(self): """ Get bulk import resources that available to the user. These define the types of bulk imports that you can run. These might include ``Contacts``, ``ActivistCodes``, ``ContactsActivistCodes`` and others. `Returns:` list A list of resources. """ r = self.connection.get_request("bulkImportJobs/resources") logger.info(f"Found {len(r)} bulk import resources.") return r
[docs] def get_bulk_import_job(self, job_id): """ Get a bulk import job status. `Args:` job_id : int The bulk import job id. `Returns:` dict The bulk import job """ r = self.connection.get_request(f"bulkImportJobs/{job_id}") logger.info(f"Found bulk import job {job_id}.") return r
[docs] def get_bulk_import_job_results(self, job_id): """ Get result file of a bulk upload job. This will include one row per record processed as well as the status of each. If the job results have not been generated, either due to an error in the process or the fact the job is still processing, it will return ``None``. `Args:` job_id: int The bulk import job id. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ r = self.get_bulk_import_job(job_id) logger.info(f"Bulk Import Job Status: {r['status']}") if r["status"] == "Completed": return Table.from_csv(r["resultFiles"][0]["url"]) return None
[docs] def get_bulk_import_mapping_types(self): """ Get bulk import mapping types. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ tbl = Table(self.connection.get_request("bulkImportMappingTypes")) logger.info(f"Found {tbl.num_rows} bulk import mapping types.") return tbl
[docs] def get_bulk_import_mapping_type(self, type_name): """ Get a single bulk import mapping type. `Args:` type_name: str `Returns`: dict A mapping type json """ r = self.connection.get_request(f"bulkImportMappingTypes/{type_name}") logger.info(f"Found {type_name} bulk import mapping type.") return r
[docs] def get_bulk_import_mapping_type_fields(self, type_name, field_name): """ Get data about a field in a mapping type. `Args:` type_name: str The mapping type name field_name: str The field name `Returns:` dict A mapping type fields json """ r = self.connection.get_request(f"bulkImportMappingTypes/{type_name}/{field_name}/values") logger.info(f"Found {type_name} bulk import mapping type field values.") return r
def post_bulk_import( self, tbl, url_type, resource_type, mapping_types, description, result_fields=None, **url_kwargs, ): # Internal method to post bulk imports. # Move to cloud storage file_name = str(uuid.uuid1()) url = cloud_storage.post_file( tbl, url_type, file_path=file_name + ".zip", quoting=csv.QUOTE_ALL, **url_kwargs, ) logger.info(f"Table uploaded to {url_type}.") # Generate request json json = { "description": description, "file": { "columnDelimiter": "csv", "columns": [{"name": c} for c in tbl.columns], "fileName": file_name + ".csv", "hasHeader": "True", "hasQuotes": "True", "sourceUrl": url, }, "actions": [ { "resultFileSizeKbLimit": 5000, "resourceType": resource_type, "actionType": "loadMappedFile", "mappingTypes": mapping_types, } ], } if result_fields: result_fields = [{"name": c} for c in result_fields] json["actions"][0]["columnsToIncludeInResultsFile"] = result_fields r = self.connection.post_request("bulkImportJobs", json=json) logger.info(f"Bulk upload {r['jobId']} created.") return r["jobId"]
[docs] def bulk_apply_activist_codes(self, tbl, url_type, **url_kwargs): """ Bulk apply activist codes. The table may include the following columns. The first column must be ``vanid``. .. list-table:: :widths: 25 25 50 :header-rows: 1 * - Column Name - Required - Description * - ``vanid`` - Yes - A valid VANID primary key * - ``activistcodeid`` - Yes - A valid activist code id * - ``datecanvassed`` - No - An ISO formatted date * - ``canvassedby`` - No - A valid User ID; Required when DateCanvassed is provided * - ``contacttypeid`` - No - The method of contact. `Args:` table: Parsons table A Parsons table. url_type: str The cloud file storage to use to post the file (``S3`` or ``GCS``). See :ref:`Cloud Storage <cloud-storage>` for more details. **url_kwargs: kwargs Arguments to configure your cloud storage url type. See :ref:`Cloud Storage <cloud-storage>` for more details. `Returns:` int The bulk import job id """ return self.post_bulk_import( tbl, url_type, "ContactsActivistCodes", [{"name": "ActivistCode"}], "Activist Code Upload", **url_kwargs, )
[docs] def bulk_upsert_contacts(self, tbl, url_type, result_fields=None, **url_kwargs): """ Bulk create or update contact records. Provide a Parsons table of contact data to create or update records. .. note:: * The first column of the table must be VANID. * The other columns can be a combination of the columns listed below. The valid column names also accept permutations with underscores, spaces and capitalization (e.g. ``phonenumber`` = ``Phone_Number``). **Table Fields** .. list-table:: :widths: 500 100 10 :header-rows: 1 * - Column - Valid Column Names - Notes * - VANID - ``vanid`` - * - Voter VAN ID - ``votervanid`` - The contact's MyVoters VANID * - External ID - ``externalid``, ``id``, ``pk``, ``voterbaseid`` - An external id to be stored. * - **PII** - - * - First Name - ``fn``, ``firstname``, ``first`` - * - Middle Name - ``mn``, ``middlename``, ``middle`` - * - Last Name - ``ln``, ``lastname``, ``last`` - * - Date of Birth - ``dob``, ``dateofbirth``, ``birthdate`` - An ISO formatted date * - Sex - ``sex``, ``gender`` - * - **Physical Address** - - * - Address Line 1 - ``addressline1``, ``address1``, ``address`` - * - Address Line 2 - ``addressline2``, ``address2`` - * - Address Line 3 - ``addressline3``, ``address3`` - * - City - ``city`` - * - State Or Province - ``state``, ``st``, ``stateorprovince`` - * - Zip or Postal Code - ``ziporpostal``, ``postal``, ``postalcode``, ``zip``, ``zipcode`` - * - Country Code - ``countrycode``, ``country`` - A valid two character country code (e.g. ``US``) * - Display As Entered - ``displayasentered`` - Required values are ``Y`` and ``N``. Determines if the address is processed through address correction. * - **Phones** - - * - Cell Phone - ``cellphone``, ``cell`` - * - Cell Phone Country Code - ``cellcountrycode``, ``cellphonecountrycode`` - A valid two digit country code (e.g. ``01``) * - Home Phone - ``homephone``, ``home``, ``phone`` - * - Home Phone Country Code - ``homecountrycode``, ``homephonecountrycode`` - * - **Email** - - * - Email - ``email``, ``emailaddress`` - * - Other Email - ``otheremail``, ``email2``, ``emailaddress2`` - `Args:` table: Parsons table A Parsons table. url_type: str The cloud file storage to use to post the file. Currently only ``S3``. results_fields: list A list of fields to include in the results file. **url_kwargs: kwargs Arguments to configure your cloud storage url type. See :ref:`Cloud Storage <cloud-storage>` for more details. `Returns:` int The bulk import job id """ tbl = tbl.map_columns(CONTACTS_COLUMN_MAP, exact_match=False) return self.post_bulk_import( tbl, url_type, "Contacts", [{"name": "CreateOrUpdateContact"}], "Create Or Update Contact Records", result_fields=result_fields, **url_kwargs, )
[docs] def bulk_apply_suppressions(self, tbl, url_type, **url_kwargs): """ Bulk apply contact suppression codes. The table may include the following columns. The first column must be ``vanid``. .. list-table:: :widths: 25 25 50 :header-rows: 1 * - Column Name - Required - Description * - ``vanid`` - Yes - A valid VANID primary key * - ``suppressionid`` - Yes - A valid suppression id `Args:` table: Parsons table A Parsons table. url_type: str The cloud file storage to use to post the file (``S3`` or ``GCS``). See :ref:`Cloud Storage <cloud-storage>` for more details. **url_kwargs: kwargs Arguments to configure your cloud storage url type. See :ref:`Cloud Storage <cloud-storage>` for more details. `Returns:` int The bulk import job id """ return self.post_bulk_import( tbl, url_type, "Contacts", [{"name": "Suppressions"}], "Apply Suppressions", **url_kwargs, )
[docs] def bulk_apply_canvass_results(self, tbl, url_type, **url_kwargs): """ Bulk apply contact canvass results. The table may include the following columns. The first column must be ``vanid``. .. list-table:: :widths: 25 25 50 :header-rows: 1 * - Column Name - Required - Description * - ``vanid`` - Yes - A valid VANID primary key * - ``contacttypeid`` - Yes - Valid Contact Type ID * - ``resultid`` - Yes - Valid Contact Result ID * - ``datecanvassed`` - Yes - ISO Date Format * - ``canvassedby`` - Yes - Valid User ID * - ``phone`` - No - Attempted Phone Number * - ``countrycode`` - No - Country Code (ISO 3166-1 alpha-2) * - ``phonetypeid`` - No - Phone Type * - ``phoneoptinstatusid`` - No - SMS Opt-In Status * - ``addressid`` - No - The Contact Address ID of the address that was canvassed `Args:` table: Parsons table A Parsons table. url_type: str The cloud file storage to use to post the file (``S3`` or ``GCS``). See :ref:`Cloud Storage <cloud-storage>` for more details. **url_kwargs: kwargs Arguments to configure your cloud storage url type. See :ref:`Cloud Storage <cloud-storage>` for more details. `Returns:` int The bulk import job id """ return self.post_bulk_import( tbl, url_type, "Contacts", [{"name": "CanvassResults"}], "Apply Canvass Results", **url_kwargs, )
[docs] def bulk_apply_contact_custom_fields(self, custom_field_group_id, tbl, url_type, **url_kwargs): """ Bulk apply contact custom fields. The table may include the following columns. The first column must be ``vanid``. .. list-table:: :widths: 25 25 60 :header-rows: 1 * - Column Name - Required - Description * - ``vanid`` - Yes - A valid VANID primary key * - ***``CF{CustomFieldID}`` - Yes - At least one custom field column to be loaded associated with the provided custom_field_group_id. The column name should be a valid Custom Field ID prefixed with ``CF``, i.e. CF123. `Args:` custom_field_group_id: int Valid Custom Contact Field Group ID; must be the parent of the provided Custom Field IDs in the file. table: Parsons table A Parsons table. url_type: str The cloud file storage to use to post the file (``S3`` or ``GCS``). See :ref:`Cloud Storage <cloud-storage>` for more details. **url_kwargs: kwargs Arguments to configure your cloud storage url type. See :ref:`Cloud Storage <cloud-storage>` for more details. `Returns:` int The bulk import job id """ mapping_types = [ { "name": "ApplyContactCustomFields", "fieldValueMappings": [ { "fieldName": "CustomFieldGroupID", "staticValue": custom_field_group_id, }, ], } ] return self.post_bulk_import( tbl, url_type, "Contacts", mapping_types, "Apply Contact Custom Fields", **url_kwargs, )
# This is a column mapper that is used to accept additional column names and provide # flexibility for the user. CONTACTS_COLUMN_MAP = { "firstname": ["fn", "first"], "middlename": ["mn", "middle"], "lastname": ["ln", "last"], "dob": ["dateofbirth", "birthdate"], "sex": ["gender"], "addressline1": ["address", "addressline1", "address1"], "addressline2": ["addressline2", "address2"], "addressline3": ["addressline3", "address3"], "city": [], "stateorprovince": ["state", "st"], "ziporpostal": ["postal", "postalcode", "zip", "zipcode"], "countrycode": ["country"], "displayasentered": [], "cellphone": ["cell"], "cellphonecountrycode": ["cellcountrycode"], "phone": ["home", "homephone"], "phonecountrycode": ["phonecountrycode"], "email": ["emailaddress"], "otheremail": ["email2", "emailaddress2"], }