"""NGPVAN Saved List Endpoints"""
from parsons.etl.table import Table
from parsons.utilities import cloud_storage
import logging
import uuid
from suds.client import Client
logger = logging.getLogger(__name__)
[docs]
class SavedLists(object):
def __init__(self, van_connection):
self.connection = van_connection
[docs]
def get_saved_lists(self, folder_id=None):
"""
Get saved lists.
`Args:`
folder_id: int
Filter by the id for a VAN folder. If included returns only
the saved lists in the folder
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
tbl = Table(self.connection.get_request('savedLists', params={'folderId': folder_id}))
logger.info(f'Found {tbl.num_rows} saved lists.')
return tbl
[docs]
def get_saved_list(self, saved_list_id):
"""
Returns a saved list object.
`Args:`
saved_list_id: int
The saved list id.
`Returns:`
dict
"""
r = self.connection.get_request(f'savedLists/{saved_list_id}')
logger.info(f'Found saved list {saved_list_id}.')
return r
[docs]
def download_saved_list(self, saved_list_id):
"""
Download the vanids associated with a saved list.
`Args:`
saved_list_id: int
The saved list id.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
ej = ExportJobs(self.connection)
job = ej.export_job_create(saved_list_id)
if isinstance(job, tuple):
return job
else:
return Table.from_csv(job['downloadUrl'])
[docs]
def upload_saved_list_rest(self, tbl, url_type, folder_id, list_name,
description, callback_url, columns, id_column,
delimiter='csv', header=True, quotes=True,
overwrite=None, **url_kwargs):
"""
Upload a saved list. Invalid or unmatched person id records will be ignored. Your api user
must be shared on the target folder.
`Args:`
tbl: parsons.Table
A parsons table object containing one column of person ids.
url_type: str
The cloud file storage to use to post the file. Currently only ``S3``.
folder_id: int
The folder id where the list will be stored.
list_name: str
The saved list name.
description: str
Description of the file upload job and the list.
callback_url: string
The configured HTTP listener to which successful list loads will send
a standard webhook.
columns: list
A list of column names contained in the file.
id_column : str
The column name of the VAN ID column in the file. Must be VAN ID.
delimiter: str
The file delimiter used.
header: boolean
Whether or not the source file has a header row.
quotes: boolean
Whether or not fields are enclosed in quotation marks within each
column of the file.
overwrite: int
Replace saved list if already exists.
**url_kwargs: kwargs
Arguments to configure your cloud storage url type.
* S3 requires ``bucket`` argument and, if not stored as env variables
``aws_access_key`` and ``aws_secret_access_key``.
`Returns:`
dict
Upload results information included the number of matched and saved
records in your list.
"""
rando = str(uuid.uuid1())
file_name = rando + '.csv'
url = cloud_storage.post_file(tbl, url_type, file_path=rando + '.zip', **url_kwargs)
url_for_van = url.split('?')[0] # hack around github.com/move-coop/parsons/issues/513
logger.info(f'Table uploaded to {url_type}.')
# VAN errors for this method are not particularly useful or helpful. For that reason, we
# will check that the folder exists and if the list already exists.
logger.info('Validating folder id and list name.')
if folder_id not in [x['folderId'] for x in self.get_folders()]:
raise ValueError("Folder does not exist or is not shared with API user.")
if list_name in [x['name'] for x in self.get_saved_lists(folder_id)]:
raise ValueError("Saved list already exists. Set overwrite "
"argument to list ID or change list name.")
if delimiter not in ['csv', 'tab', 'pipe']:
raise ValueError("Delimiter must be one of 'csv', 'tab' or 'pipe'")
columns = [{'name': c} for c in columns]
delimiter = delimiter.capitalize()
json = {"description": description,
"file": {
"columnDelimiter": delimiter,
"columns": columns,
"fileName": file_name,
"hasHeader": header,
"hasQuotes": quotes,
"sourceUrl": url_for_van
},
"actions": [
{"actionType": "LoadSavedListFile",
"listDescription": description,
"listName": list_name,
"personIdColumn": id_column,
"folderId": folder_id,
"personIdType": "VANID"}],
"listeners": [
{"type": "URL",
"value": callback_url}]
}
if overwrite:
json["actions"][0]["overwriteExistingListId"] = overwrite
logger.info(json)
file_load_job_response = self.connection.post_request('fileLoadingJobs', json=json)
job_id = file_load_job_response['jobId']
logger.info(f'Score loading job {job_id} created. Reference '
'callback url to check for job status')
return file_load_job_response
[docs]
def upload_saved_list(self, tbl, list_name, folder_id, url_type, id_type='vanid', replace=False,
**url_kwargs):
"""
.. warning::
.. deprecated:: 0.X Use :func:`parsons.VAN.upload_saved_list_rest` instead.
Upload a saved list. Invalid or unmatched person id records will be ignored. Your api user
must be shared on the target folder.
`Args:`
tbl: parsons.Table
A parsons table object containing one column of person ids.
list_name: str
The saved list name.
folder_id: int
The folder id where the list will be stored.
url_type: str
The cloud file storage to use to post the file. Currently only ``S3``.
id_type: str
The primary key type. The options, beyond ``vanid`` are specific to your
instance of VAN.
replace: boolean
Replace saved list if already exists.
**url_kwargs: kwargs
Arguments to configure your cloud storage url type.
* S3 requires ``bucket`` argument and, if not stored as env variables
``aws_access_key`` and ``aws_secret_access_key``.
`Returns:`
dict
Upload results information included the number of matched and saved
records in your list.
"""
# Move to cloud storage
file_name = str(uuid.uuid1())
url = cloud_storage.post_file(tbl, url_type, file_path=file_name + '.zip', **url_kwargs)
logger.info(f'Table uploaded to {url_type}.')
# VAN errors for this method are not particularly useful or helpful. For that reason, we
# will check that the folder exists and if the list already exists.
logger.info('Validating folder id and list name.')
if folder_id not in [x['folderId'] for x in self.get_folders()]:
raise ValueError("Folder does not exist or is not shared with API user.")
if not replace:
if list_name in [x['name'] for x in self.get_saved_lists(folder_id)]:
raise ValueError("Saved list already exists. Set to replace argument to True or "
"change list name.")
# i think we dont need this if we have the warning in the funciton description,
# perhapse a style/standanrds decision
if id_type == 'vanid':
logger.warning('The NVPVAN SOAP API is deprecated, consider using '
'parsons.VAN.upload_saved_list_rest if you are '
'uploading a list of vanids.')
# Create XML
xml = self.connection.soap_client.factory.create('CreateAndStoreSavedListMetaData')
xml.SavedList._Name = list_name
xml.DestinationFolder._ID = folder_id
xml.SourceFile.FileName = file_name + '.csv'
xml.SourceFile.FileUrl = url
xml.SourceFile.FileCompression = 'zip'
xml.Options.OverwriteExistingList = replace
# Describe file
file_desc = self.connection.soap_client.factory.create('SeparatedFileFormatDescription')
file_desc._name = 'csv'
file_desc.HasHeaderRow = True
# Only support single column for now
col = self.connection.soap_client.factory.create('Column')
col.Name = id_type
col.RefersTo._Path = f"Person[@PersonIDType=\'{id_type}\']"
col._Index = '0'
# Assemble request
file_desc.Columns.Column.append(col)
xml.SourceFile.Format = file_desc
r = Client.dict(self.connection.soap_client.service.CreateAndStoreSavedList(xml))
if r:
logger.info(f"Uploaded {r['ListSize']} records to {r['_Name']} saved list.")
return r
[docs]
class Folders(object):
def __init__(self, van_connection):
# Some sort of test if the van_connection is not present.
self.connection = van_connection
[docs]
def get_folders(self):
"""
Get all folders owned or shared with the API user.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
tbl = Table(self.connection.get_request('folders'))
logger.info(f'Found {tbl.num_rows} folders.')
return tbl
[docs]
def get_folder(self, folder_id):
"""
Get a folder owned by or shared with the API user.
`Args:`
folder_id: int
The folder id.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
r = self.connection.get_request(f'folders/{folder_id}')
logger.info(f'Found folder {folder_id}.')
return r
[docs]
class ExportJobs(object):
def __init__(self, van_connection):
self.connection = van_connection
[docs]
def get_export_job_types(self):
"""
Get export job types
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
tbl = Table(self.connection.get_request('exportJobTypes'))
logger.info(f'Found {tbl.num_rows} export job types.')
return tbl
[docs]
def export_job_create(self, list_id, export_type=4,
webhookUrl="https://www.nothing.com"):
"""
Creates an export job
Currently, this is only used for exporting saved lists. It is
recommended that you use the :meth:`saved_list_download` method
instead.
`Args:`
list_id: int
This is where you should input the list id
export_type: int
The export type id, which defines the columns to export
webhookUrl:
A webhook to include to notify as to the status of the export
`Returns:`
dict
The export job object
"""
json = {"savedListId": str(list_id),
"type": str(export_type),
"webhookUrl": webhookUrl
}
r = self.connection.post_request('exportJobs', json=json)
logger.info('Retrieved export job.')
return r
[docs]
def get_export_job(self, export_job_id):
"""
Get an export job.
`Args:`
export_job_id: int
The xxport job id.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
r = self.connection.get_request(f'exportJobs/{export_job_id}')
logger.info(f'Found export job {export_job_id}.')
return r