Source code for parsons.ngpvan.saved_lists

"""NGPVAN Saved List Endpoints"""

from parsons.etl.table import Table
from parsons.utilities import cloud_storage
import logging
import uuid
from suds.client import Client

logger = logging.getLogger(__name__)

[docs]class SavedLists(object): def __init__(self, van_connection): self.connection = van_connection
[docs] def get_saved_lists(self, folder_id=None): """ Get saved lists. `Args:` folder_id: int Filter by the id for a VAN folder. If included returns only the saved lists in the folder `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ tbl = Table(self.connection.get_request('savedLists', params={'folderId': folder_id}))'Found {tbl.num_rows} saved lists.') return tbl
[docs] def get_saved_list(self, saved_list_id): """ Returns a saved list object. `Args:` saved_list_id: int The saved list id. `Returns:` dict """ r = self.connection.get_request(f'savedLists/{saved_list_id}')'Found saved list {saved_list_id}.') return r
[docs] def download_saved_list(self, saved_list_id): """ Download the vanids associated with a saved list. `Args:` saved_list_id: int The saved list id. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ ej = ExportJobs(self.connection) job = ej.export_job_create(saved_list_id) if isinstance(job, tuple): return job else: return Table.from_csv(job['downloadUrl'])
[docs] def upload_saved_list(self, tbl, list_name, folder_id, url_type, id_type='vanid', replace=False, **url_kwargs): """ Upload a saved list. Invalid or unmatched person id records will be ignored. Your api user must be shared on the target folder. `Args:` tbl: parsons.Table A parsons table object containing one column of person ids. list_name: str The saved list name. folder_id: int The folder id where the list will be stored. url_post_type: str The cloud file storage to use to post the file. Currently only ``S3``. id_type: str The primary key type. The options, beyond ``vanid`` are specific to your instance of VAN. replace: boolean Replace saved list if already exists. **url_kwargs: kwargs Arguments to configure your cloud storage url type. * S3 requires ``bucket`` argument and, if not stored as env variables ``aws_access_key`` and ``aws_secret_access_key``. `Returns:` dict Upload results information included the number of matched and saved records in your list. """ # Move to cloud storage file_name = str(uuid.uuid1()) url = cloud_storage.post_file(tbl, url_type, file_path=file_name + '.zip', **url_kwargs)'Table uploaded to {url_type}.') # Create XML xml = self.connection.soap_client.factory.create('CreateAndStoreSavedListMetaData') xml.SavedList._Name = list_name xml.DestinationFolder._ID = folder_id xml.SourceFile.FileName = file_name + '.csv' xml.SourceFile.FileUrl = url xml.SourceFile.FileCompression = 'zip' xml.Options.OverwriteExistingList = replace # Describe file file_desc = self.connection.soap_client.factory.create('SeparatedFileFormatDescription') file_desc._name = 'csv' file_desc.HasHeaderRow = True # Only support single column for now col = self.connection.soap_client.factory.create('Column') col.Name = id_type col.RefersTo._Path = f"Person[@PersonIDType=\'{id_type}\']" col._Index = '0' # VAN errors for this method are not particularly useful or helpful. For that reason, we # will check that the folder exists and if the list already exists.'Validating folder id and list name.') if folder_id not in [x['folderId'] for x in self.get_folders()]: raise ValueError("Folder does not exist or is not shared with API user.") if not replace: if list_name in [x['name'] for x in self.get_saved_lists(folder_id)]: raise ValueError("Saved list already exists. Set to replace argument to True or " "change list name.") # Assemble request file_desc.Columns.Column.append(col) xml.SourceFile.Format = file_desc r = Client.dict(self.connection.soap_client.service.CreateAndStoreSavedList(xml)) if r:"Uploaded {r['ListSize']} records to {r['_Name']} saved list.") return r
[docs]class Folders(object): def __init__(self, van_connection): # Some sort of test if the van_connection is not present. self.connection = van_connection
[docs] def get_folders(self): """ Get all folders owned or shared with the API user. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ tbl = Table(self.connection.get_request('folders'))'Found {tbl.num_rows} folders.') return tbl
[docs] def get_folder(self, folder_id): """ Get a folder owned by or shared with the API user. `Args:` folder_id: int The folder id. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ r = self.connection.get_request(f'folders/{folder_id}')'Found folder {folder_id}.') return r
[docs]class ExportJobs(object): def __init__(self, van_connection): self.connection = van_connection
[docs] def get_export_job_types(self): """ Get export job types `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ tbl = Table(self.connection.get_request('exportJobTypes'))'Found {tbl.num_rows} export job types.') return tbl
[docs] def export_job_create(self, list_id, export_type=4, webhookUrl=""): """ Creates an export job Currently, this is only used for exporting saved lists. It is recommended that you use the :meth:`saved_list_download` method instead. `Args:` list_id: int This is where you should input the list id export_type: int The export type id, which defines the columns to export webhookUrl: A webhook to include to notify as to the status of the export `Returns:` dict The export job object """ json = {"savedListId": str(list_id), "type": str(export_type), "webhookUrl": webhookUrl } r = self.connection.post_request('exportJobs', json=json)'Retrieved export job.') return r
[docs] def get_export_job(self, export_job_id): """ Get an export job. `Args:` export_job_id: int The xxport job id. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ r = self.connection.get_request(f'exportJobs/{export_job_id}')'Found export job {export_job_id}.') return r