Source code for parsons.pdi.pdi

from parsons.pdi.flag_ids import FlagIDs
from parsons.pdi.flags import Flags
from parsons.pdi.universes import Universes
from parsons.pdi.questions import Questions
from parsons.pdi.acquisition_types import AcquisitionTypes

from parsons import Table
from parsons.utilities import check_env

from datetime import datetime, timezone
from dateutil.parser import parse
from json.decoder import JSONDecodeError
import logging
import requests


logger = logging.getLogger(__name__)


[docs]class PDI(FlagIDs, Universes, Questions, AcquisitionTypes, Flags): def __init__(self, username=None, password=None, api_token=None, qa_url=False): """ Instantiate the PDI class `Args:` username: str The username for a PDI account. Can be passed as arguement or can be set as `PDI_USERNAME` environment variable. password: str The password for a PDI account. Can be passed as arguement or can be set as `PDI_PASSWORD` environment variable. api_token: str The api_token for a PDI account. Can be passed as arguement or can be set as `PDI_API_TOKEN` environment variable. qa_url: bool Defaults to False. If True, requests will be made to a sandbox account. NOTE: This requires separate qa credentials and api token. """ if qa_url: self.base_url = "https://apiqa.bluevote.com" else: self.base_url = "https://api.bluevote.com" self.username = check_env.check('PDI_USERNAME', username) self.password = check_env.check('PDI_PASSWORD', password) self.api_token = check_env.check('PDI_API_TOKEN', api_token) super().__init__() self._get_session_token() def _get_session_token(self): headers = { "Content-Type": "application/json", } login = { "Username": self.username, "Password": self.password, "ApiToken": self.api_token, } res = requests.post( f"{self.base_url}/sessions", json=login, headers=headers) logger.debug(f"{res.status_code} - {res.url}") res.raise_for_status() # status_code == 200 data = res.json() self.session_token = data["AccessToken"] self.session_exp = parse(data["ExpirationDate"]) def _clean_dict(self, dct): if isinstance(dct, list): return [self._clean_dict(obj) for obj in dct] if isinstance(dct, dict): return {k: v for k, v in dct.items() if v is not None} return dct def _request(self, url, req_type='GET', post_data=None, args=None, limit=None): # Make sure to have a current token before we make another request now = datetime.now(timezone.utc) if now > self.session_exp: self._get_session_token() # Based on PDI docs # https://api.bluevote.com/docs/index LIMIT_MAX = 2000 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.session_token}", } request_fn = { "GET": requests.get, "POST": requests.post, "PUT": requests.put, "DELETE": requests.delete, } if limit and limit <= LIMIT_MAX: args = args or {} args["limit"] = limit args = self._clean_dict(args) if args else args post_data = self._clean_dict(post_data) if post_data else post_data res = request_fn[req_type]( url, headers=headers, json=post_data, params=args) logger.debug(f"{res.url} - {res.status_code}") logger.debug(res.request.body) res.raise_for_status() if not res.text: return None logger.debug(res.text) try: res_json = res.json() except JSONDecodeError: res_json = None if "data" not in res_json: return res_json total_count = (0 if "totalCount" not in res_json else res_json["totalCount"]) data = res_json["data"] if not limit: # We don't have a limit, so let's get everything # Start a page 2 since we already go page 1 cursor = 2 while len(data) < total_count: args = args or {} args["cursor"] = cursor args["limit"] = LIMIT_MAX res = request_fn[req_type]( url, headers=headers, json=post_data, params=args) data.extend(res.json()["data"]) cursor += 1 return Table(data) else: total_need = min(limit, total_count) cursor = 2 while len(data) < total_need: args = args or {} args["cursor"] = cursor args["limit"] = min(LIMIT_MAX, total_need - len(data)) res = request_fn[req_type]( url, headers=headers, json=post_data, params=args) data.extend(res.json()["data"]) cursor += 1 return Table(data)