Source code for parsons.ngpvan.targets

"""NGPVAN Target Endpoints"""

from parsons.etl.table import Table
import logging
import json
import requests

logger = logging.getLogger(__name__)


class TargetsFailed(Exception):
    pass


[docs]class Targets(object): def __init__(self, van_connection): self.connection = van_connection def obj_dict(obj): return obj.__dict__
[docs] def get_targets(self): """ Get targets. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ tbl = Table(self.connection.get_request('targets')) logger.info(f'Found {tbl.num_rows} targets.') return tbl
[docs] def get_target(self, target_id): """ Get a single target. `Args:` target_id : int The target id. `Returns:` dict The target """ r = self.connection.get_request(f'targets/{target_id}') logger.info(f'Found target {target_id}.') return r
[docs] def get_target_export(self, export_job_id): """ Get specific target export job id's status. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ response = self.connection.get_request(f'targetExportJobs/{export_job_id}') json_string = json.dumps(response) json_obj = json.loads(json_string) for i in json_obj: job_status = i['jobStatus'] if job_status == 'Complete': for j in json_obj: csv = j['file']['downloadUrl'] response_csv = requests.get(csv) return Table.from_csv_string(response_csv.text) elif job_status == 'Pending' or job_status == 'InProcess': logger.info(f'Target export job is pending or in process for {export_job_id}.') else: raise TargetsFailed(f'Target export failed for {export_job_id}')
[docs] def create_target_export(self, target_id, webhook_url=None): """ Create new target export job `Args:` target_id : int The target id the export job is creating for. `Returns:` dict The target export job ID """ target_export = { 'targetId': target_id } r = self.connection.post_request('targetExportJobs', json=target_export) logger.info(f'Created new target export job for {target_id}.') return r