Source code for parsons.ngpvan.changed_entities

"""NGPVAN Changed Entities"""

from parsons.etl.table import Table
import logging
import time

logger = logging.getLogger(__name__)

RETRY_RATE = 10


[docs] class ChangedEntities(object): def __init__(self): pass
[docs] def get_changed_entity_resources(self): """ Get changed entity resources available to the API user. `Returns:` list """ r = self.connection.get_request("changedEntityExportJobs/resources") logger.info(f"Found {len(r)} changed entity resources.") return r
[docs] def get_changed_entity_resource_fields(self, resource_type): """ Get export fields avaliable for each changed entity resource. `Args:` resource_type: str `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ tbl = Table(self.connection.get_request(f"changedEntityExportJobs/fields/{resource_type}")) logger.info(f"Found {tbl.num_rows} fields for {resource_type}.") return tbl
[docs] def get_changed_entities( self, resource_type, date_from, date_to=None, include_inactive=False, requested_fields=None, custom_fields=None, ): """ Get modified records for VAN from up to 90 days in the past. `Args:` resource_type: str The type of resource to export. Use the :py:meth:`~parsons.ngpvan.changed_entities.ChangedEntities.get_changed_entity_resources` to get a list of potential entities. date_from: str The start date in which to search. Must be less than 90 days in the past. Must be``iso8601`` formatted date (``2021-10-11``). date_to: str The end date to search. Must be less than 90 days in the past. Must be``iso8601`` formatted date (``2021-10-11``). include_inactive: boolean Include inactive records requested_fields: list A list of optional requested fields to include. These options can be accessed through :py:meth:`~parsons.ngpvan.changed_entities.ChangedEntities.get_changed_entity_resource_fields` method. custom_fields: list A list of ids of custom fields to include in the export. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ # noqa: E501 json = { "dateChangedFrom": date_from, "dateChangedTo": date_to, "resourceType": resource_type, "requestedFields": requested_fields, "requestedCustomFieldIds": custom_fields, "fileSizeKbLimit": 100000, "includeInactive": include_inactive, } r = self.connection.post_request("changedEntityExportJobs", json=json) while True: status = self._get_changed_entity_job(r["exportJobId"]) if status["jobStatus"] in ["Pending", "InProcess"]: logger.info("Waiting on export file.") time.sleep(RETRY_RATE) elif status["jobStatus"] == "Complete": return Table.from_csv(status["files"][0]["downloadUrl"]) else: raise ValueError(status["message"])
def _get_changed_entity_job(self, job_id): r = self.connection.get_request(f"changedEntityExportJobs/{job_id}") return r