Source code for parsons.ngpvan.scores

"""NGPVAN Score Endpoints"""

from parsons.etl.table import Table
from parsons.utilities import cloud_storage
import uuid
import logging
import petl

logger = logging.getLogger(__name__)


[docs]class Scores(object): def __init__(self, van_connection): self.connection = van_connection
[docs] def get_scores(self): """ Get all scores. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ tbl = Table(self.connection.get_request('scores')) logger.info(f'Found {tbl.num_rows} scores.') return tbl
[docs] def get_score(self, score_id): """ Get an individual score. `Args:` score_id: int The score id `Returns:` dict """ r = self.connection.get_request(f'scores/{score_id}') logger.info(f'Found score {score_id}.') return r
[docs] def get_score_updates(self, created_before=None, created_after=None, score_id=None): """ Get score updates. `Args:` created_before: str Filter score updates to those created before date. Use "YYYY-MM-DD" format. created_after: str Filter score updates to those created after date. Use "YYYY-MM-DD" format. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ params = {'createdBefore': created_before, 'createdAfter': created_after, 'scoreId': score_id} tbl = Table(self.connection.get_request('scoreUpdates', params=params)) if tbl.num_rows: tbl.unpack_dict('updateStatistics', prepend=False) tbl.unpack_dict('score', prepend=False) logger.info(f'Found {tbl.num_rows} score updates.') return tbl
[docs] def get_score_update(self, score_update_id): """ Get a score update object `Args:` score_update_id : int The score update id `Returns:` dict """ r = self.connection.get_request(f'scoreUpdates/{score_update_id}') logger.info(f'Returning score update {score_update_id}.') return r
[docs] def update_score_status(self, score_update_id, status): """ Change the status of a score update object. This end point is used to approve a score loading job. `Args:` score_update_id: str The score update id status: str One of 'pending approval', 'approved', 'disapproved' `Returns:` ``None`` """ if status not in ['pending approval', 'approved', 'disapproved', 'canceled']: raise ValueError("""Valid inputs for status are, 'pending approval', 'approved','disapproved','canceled'""") else: if status == 'pending approval': status = 'PendingApproval' else: status = status.capitalize() json = {"loadStatus": status} r = self.connection.patch_request(f'scoreUpdates/{score_update_id}', json=json) logger.info(f'Score {score_update_id} status updated to {status}.') return r
[docs] def upload_scores(self, tbl, config, url_type, id_type='vanid', email=None, auto_approve=True, approve_tolerance=.1, **url_kwargs): """ Upload scores. Use to create or overwrite scores. Multiple score loads should be configured in a single call. [1]_ `Args:` tbl: object A parsons.Table object. The table must contain the scores and first column in the table must contain the primary key (e.g. vanid). config: list The score configuration. A list of dictionaries in which you specify the following .. list-table:: :widths: 20 80 :header-rows: 0 * - ``score_column`` - The name of the column where the score is housed. * - ``score_id`` - The score slot id. Example: .. highlight:: python .. code-block:: python [{'score1_id' : int, score1_column': str} {'score2_id' : int, score2_column': str}] url_type: str The cloud file storage to use to post the file. See :ref:`Cloud Storage <cloud-storage>` for more details. email: str An email address to send job load status updates. auto_approve: boolean If the scores are within the expected tolerance of deviation from the average values provided, then score will be automatically approved. approve_tolderance: float The deviation from the average scores allowed in order to automatically approve the score. Maximum of .1. **url_kwargs: kwargs Arguments to configure your cloud storage url type. See :ref:`Cloud Storage <cloud-storage>` for more details. `Returns:` int The score load job id. .. [1] NGPVAN asks that you load multiple scores in a single call to reduce the load on their servers. """ # Move to cloud storage file_name = str(uuid.uuid1()) url = cloud_storage.post_file(tbl, url_type, file_path=file_name + '.zip', **url_kwargs) logger.info(f'Table uploaded to {url_type}.') # Generate shell request json = {"description": 'A description', "file": { "columnDelimiter": 'csv', "columns": [{'name': c} for c in tbl.columns], "fileName": file_name + '.csv', "hasHeader": "True", "hasQuotes": "False", "sourceUrl": url}, "actions": [] } # Configure each score for i in config: action = {"actionType": "score", "personIdColumn": tbl.columns[0], "personIdType": id_type, "scoreColumn": i['score_column'], "scoreId": i['score_id']} if auto_approve: average = petl.stats(tbl.table, i['score_column']).mean action['approvalCriteria'] = {"average": average, "tolerance": approve_tolerance} json['actions'].append(action) # Add email listener if email: json['listeners'] = [{"type": "EMAIL", 'value': email}] # Upload scores r = self.connection.post_request('fileLoadingJobs', json=json) logger.info(f"Scores job {r['jobId']} created.") return r['jobId']
[docs]class FileLoadingJobs(object): def __init__(self, van_connection): self.connection = van_connection
[docs] def create_file_load(self, file_name, file_url, columns, id_column, id_type, score_id, score_column, delimiter='csv', header=True, quotes=True, description=None, email=None, auto_average=None, auto_tolerance=None): """ .. warning:: .. deprecated:: 0.7 Use :func:`parsons.VAN.upload_scores` instead. Loads a file. Only used for loading scores at this time. Scores must be compressed using `zip`. `Args:` file_name: str The name of the file contained in the zip file. file_url: str The url path to directly download the file. Can also be a path to an FTP site. columns: list A list of column names contained in the file. id_column: str The column name of the id column in the file. id_type: str A valid primary key, such as `VANID` or `DWID`. Varies by VAN instance. score_id: int The score slot id score_column: str The column holding the score delimiter: str The file delimiter used. email: str A valid email address in which file loading status will be sent. auto_average: float The average of scores to be loaded. auto_tolerance: float The fault tolerance of the VAN calculated average compared to the ``auto_average``. The tolerance must be less than 10% of the difference between the maximum and minimum possible acceptable values of the score. `Returns:` dict The file load id """ columns = [{'name': c} for c in columns] # To Do: Validate that it is a .zip file. Not entirely sure if this is possible # as some urls might not end in ".zip". if delimiter not in ['csv', 'tab', 'pipe']: raise ValueError("Delimiter must be one of 'csv', 'tab' or 'pipe'") delimiter = delimiter.capitalize() json = {"description": 'A description', "file": { "columnDelimiter": delimiter, "columns": columns, "fileName": file_name, "hasHeader": header, "hasQuotes": quotes, "sourceUrl": file_url }, "actions": [ {"actionType": "score", "personIdColumn": id_column, "personIdType": id_type, "scoreColumn": score_column, "scoreId": score_id}], "listeners": [ {"type": "EMAIL", "value": email}] } if auto_average and auto_tolerance: json["actions"]["approvalCriteria"] = {"average": auto_average, "tolerance": auto_tolerance} r = self.connection.post_request('fileLoadingJobs', json=json)['jobId'] logger.info(f'Score loading job {r} created.') return r
[docs] def create_file_load_multi(self, file_name, file_url, columns, id_column, id_type, score_map, delimiter='csv', header=True, quotes=True, description=None, email=None): """ .. warning:: .. deprecated:: 0.7 Use :func:`parsons.VAN.upload_scores` instead. An iteration of the :meth:`file_load` method that allows you to load multiple scores at the same time. `Args:` file_name : str The name of the file contained in the zip file. file_url : str The url path to directly download the file. Can also be a path to an FTP site. columns: list A list of column names contained in the file. id_column : str The column name of the id column in the file. id_type : str A valid primary key, such as `VANID` or `DWID`. Varies by VAN instance. score_map : list A list of dicts that adheres to the following syntax .. highlight:: python .. code-block:: python [{'score_id' : int, 'score_column': str, 'auto_average': float, 'auto_tolerance': float }] email: str A valid email address in which file loading status will be sent. `Returns:` The file load job id """ columns = [{'name': c} for c in columns] # To Do: Validate that it is a .zip file. Not entirely sure if this is possible # as some urls might not end in ".zip". if delimiter not in ['csv', 'tab', 'pipe']: raise ValueError("Delimiter must be one of 'csv', 'tab' or 'pipe'") delimiter = delimiter.capitalize() json = {"description": 'A description', "file": { "columnDelimiter": delimiter, "columns": columns, "fileName": file_name, "hasHeader": header, "hasQuotes": quotes, "sourceUrl": file_url }, "listeners": [ {"type": "EMAIL", "value": email}] } actions = [] for score in score_map: action = {"actionType": "score", "personIdColumn": id_column, "personIdType": id_type, "scoreColumn": score['score_column'], "scoreId": score['score_id'], "approvalCriteria": { "average": score['auto_average'], "tolerance": score['auto_tolerance'] } } actions.append(action) json['actions'] = actions r = self.connection.post_request('fileLoadingJobs', json=json)['jobId'] logger.info(f'Score loading job {r} created.') return r