"""NGPVAN Changed Entities"""
from parsons.etl.table import Table
import logging
import time
logger = logging.getLogger(__name__)
RETRY_RATE = 10
[docs]
class ChangedEntities(object):
def __init__(self):
pass
[docs]
def get_changed_entity_resources(self):
"""
Get changed entity resources available to the API user.
`Returns:`
list
"""
r = self.connection.get_request('changedEntityExportJobs/resources')
logger.info(f'Found {len(r)} changed entity resources.')
return r
[docs]
def get_changed_entity_resource_fields(self, resource_type):
"""
Get export fields avaliable for each changed entity resource.
`Args:`
resource_type: str
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
tbl = Table(self.connection.get_request(f'changedEntityExportJobs/fields/{resource_type}'))
logger.info(f'Found {tbl.num_rows} fields for {resource_type}.')
return tbl
[docs]
def get_changed_entities(self, resource_type, date_from, date_to=None, include_inactive=False,
requested_fields=None, custom_fields=None):
"""
Get modified records for VAN from up to 90 days in the past.
`Args:`
resource_type: str
The type of resource to export. Use the :py:meth:`~parsons.ngpvan.changed_entities.ChangedEntities.get_changed_entity_resources`
to get a list of potential entities.
date_from: str
The start date in which to search. Must be less than 90 days in the
past. Must be``iso8601`` formatted date (``2021-10-11``).
date_to: str
The end date to search. Must be less than 90 days in the
past. Must be``iso8601`` formatted date (``2021-10-11``).
include_inactive: boolean
Include inactive records
requested_fields: list
A list of optional requested fields to include. These options can be accessed through
:py:meth:`~parsons.ngpvan.changed_entities.ChangedEntities.get_changed_entity_resource_fields`
method.
custom_fields: list
A list of ids of custom fields to include in the export.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
""" # noqa: E501
json = {
"dateChangedFrom": date_from,
"dateChangedTo": date_to,
"resourceType": resource_type,
"requestedFields": requested_fields,
"requestedCustomFieldIds": custom_fields,
"fileSizeKbLimit": 100000,
"includeInactive": include_inactive
}
r = self.connection.post_request('changedEntityExportJobs', json=json)
while True:
status = self._get_changed_entity_job(r['exportJobId'])
if status['jobStatus'] in ['Pending', 'InProcess']:
logger.info('Waiting on export file.')
time.sleep(RETRY_RATE)
elif status['jobStatus'] == 'Complete':
return Table.from_csv(status['files'][0]['downloadUrl'])
else:
raise ValueError(status['message'])
def _get_changed_entity_job(self, job_id):
r = self.connection.get_request(f'changedEntityExportJobs/{job_id}')
return r