Source code for parsons.ngpvan.bulk_import

"""NGPVAN Bulk Import Endpoints"""
from parsons.etl.table import Table
from parsons.utilities import cloud_storage

import logging
import uuid
import csv

logger = logging.getLogger(__name__)


[docs]class BulkImport(object): def __init__(self): pass
[docs] def get_bulk_import_resources(self): """ Get bulk import resources that available to the user. These define the types of bulk imports that you can run. These might include ``Contacts``, ``ActivistCodes``, ``ContactsActivistCodes`` and others. `Returns:` list A list of resources. """ r = self.connection.get_request(f'bulkImportJobs/resources') logger.info(f'Found {len(r)} bulk import resources.') return r
[docs] def get_bulk_import_job(self, job_id): """ Get a bulk import job status. `Args:` job_id : int The bulk import job id. `Returns:` dict The bulk import job """ r = self.connection.get_request(f'bulkImportJobs/{job_id}') logger.info(f'Found bulk import job {job_id}.') return r
[docs] def get_bulk_import_mapping_types(self): """ Get bulk import mapping types. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ tbl = Table(self.connection.get_request('bulkImportMappingTypes')) logger.info(f'Found {tbl.num_rows} bulk import mapping types.') return tbl
[docs] def get_bulk_import_mapping_type(self, type_name): """ Get a single bulk import mapping type. `Args:` type_name: str `Returns`: dict A mapping type json """ r = self.connection.get_request(f'bulkImportMappingTypes/{type_name}') logger.info(f'Found {type_name} bulk import mapping type.') return r
def post_bulk_import(self, tbl, url_type, resource_type, mapping_types, description, **url_kwargs): # Internal method to post bulk imports. # Move to cloud storage file_name = str(uuid.uuid1()) url = cloud_storage.post_file(tbl, url_type, file_path=file_name + '.zip', quoting=csv.QUOTE_ALL, **url_kwargs) logger.info(f'Table uploaded to {url_type}.') # Generate request json json = {"description": description, "file": { "columnDelimiter": 'csv', "columns": [{'name': c} for c in tbl.columns], "fileName": file_name + '.csv', "hasHeader": "True", "hasQuotes": "True", "sourceUrl": url}, "actions": [{"resultFileSizeKbLimit": 5000, "resourceType": resource_type, "actionType": "loadMappedFile", "mappingTypes": mapping_types}] } r = self.connection.post_request('bulkImportJobs', json=json) logger.info(f"Bulk upload {r['jobId']} created.") return r['jobId']
[docs] def bulk_apply_activist_codes(self, tbl, url_type, **url_kwargs): """ Bulk apply activist codes. The table may include the following columns. The first column must be ``vanid``. .. list-table:: :widths: 25 25 50 :header-rows: 1 * - Column Name - Required - Description * - ``vanid`` - Yes - A valid VANID primary key * - ``activistcodeid`` - Yes - A valid activist code id * - ``datecanvassed`` - No - An ISO formatted date * - ``contacttypeid`` - No - The method of contact. `Args:` table: Parsons table A Parsons table. url_type: str The cloud file storage to use to post the file. Currently only ``S3``. **url_kwargs: kwargs Arguments to configure your cloud storage url type. * S3 requires ``bucket`` argument and, if not stored as env variables ``aws_access_key`` and ``aws_secret_access_key``. `Returns:` int The bulk import job id """ return self.post_bulk_import(tbl, url_type, 'ContactsActivistCodes', [{"name": "ActivistCode"}], 'Activist Code Upload', **url_kwargs)