import json
import logging
import requests
import time
from parsons.etl.table import Table
from parsons.utilities.check_env import check
logger = logging.getLogger(__name__)
class RedashTimeout(Exception):
pass
class RedashQueryFailed(Exception):
pass
[docs]class Redash(object):
"""
Instantiate Redash Class
`Args:`
base_url: str
The base url for your redash instance (excluding the final /)
user_api_key: str
The user API key found in the User's profile screen
pause_time int
Specify time between polling for refreshed queries (Defaults to 3 seconds)
verify: bool
For https requests, should the certificate be verified (Defaults to True)
`Returns:`
Redash Class
"""
def __init__(self,
base_url=None,
user_api_key=None,
pause_time=3,
timeout=0, # never timeout
verify=True):
self.base_url = check('REDASH_BASE_URL', base_url)
self.user_api_key = check('REDASH_USER_API_KEY', user_api_key, optional=True)
self.pause = int(check('REDASH_PAUSE_TIME', pause_time, optional=True))
self.timeout = int(check('REDASH_TIMEOUT', timeout, optional=True))
self.verify = verify # for https requests
self.session = requests.Session()
if user_api_key:
self.session.headers.update({'Authorization': f'Key {user_api_key}'})
def _poll_job(self, session, job, query_id):
start_secs = time.time()
while job['status'] not in (3, 4):
if self.timeout and start_secs + self.timeout < time.time():
raise RedashTimeout(f'Redash timeout: {self.timeout}')
poll_url = '{}/api/jobs/{}'.format(self.base_url, job['id'])
response = session.get(poll_url, verify=self.verify)
response_json = response.json()
job = response_json.get(
'job',
{'status': 'Error NO JOB IN RESPONSE: {}'.format(json.dumps(response_json))})
logger.debug("poll url:%s id:%s status:%s err:%s",
poll_url, query_id, job['status'], job.get('error'))
time.sleep(self.pause)
if job['status'] == 3: # 3 = completed
return job['query_result_id']
elif job['status'] == 4: # 3 = ERROR
raise RedashQueryFailed('Redash Query {} failed: {}'.format(query_id, job['error']))
[docs] def get_fresh_query_results(self, query_id=None, params=None):
"""
Make a fresh query result and get back the CSV http response object back
with the CSV string in result.content
`Args:`
query_id: str or int
The query id of the query
params: dict
If there are values for the redash query parameters
(described https://redash.io/help/user-guide/querying/query-parameters
e.g. "{{datelimit}}" in the query),
then this is a dict that will pass the parameters in the POST.
We add the "p_" prefix for parameters, so if your query had ?p_datelimit=....
in the url, you should just set 'datelimit' in params here.
If you set this with REDASH_QUERY_PARAMS environment variable instead of passing
the values, then you must include the "p_" prefixes and it should be a single
url-encoded string as you would see it in the URL bar.
`Returns:`
Table Class
"""
query_id = check('REDASH_QUERY_ID', query_id, optional=True)
params_from_env = check('REDASH_QUERY_PARAMS', '', optional=True)
redash_params = ({'p_%s' % k: str(v).replace("'", "''") for k, v in params.items()}
if params else {})
response = self.session.post(
f'{self.base_url}/api/queries/{query_id}/refresh?{params_from_env}',
params=redash_params,
verify=self.verify)
if response.status_code != 200:
raise RedashQueryFailed(f'Refresh failed for query {query_id}. {response.text}')
job = response.json()['job']
result_id = self._poll_job(self.session, job, query_id)
if result_id:
response = self.session.get(
f'{self.base_url}/api/queries/{query_id}/results/{result_id}.csv',
verify=self.verify)
if response.status_code != 200:
raise RedashQueryFailed(
f'Failed getting results for query {query_id}. {response.text}')
else:
raise RedashQueryFailed(f'Failed getting result {query_id}. {response.text}')
return Table.from_csv_string(response.text)
[docs] def get_cached_query_results(self, query_id=None, query_api_key=None):
"""
Get the results from a cached query result and get back the CSV http response object back
with the CSV string in result.content
`Args:`
query_id: str or int
The query id of the query
query_api_key: str
If you did not supply a user_api_key on the Redash object, then you can
supply a query_api_key to get cached results back anonymously.
`Returns:`
Table Class
"""
query_id = check('REDASH_QUERY_ID', query_id)
query_api_key = check('REDASH_QUERY_API_KEY', query_api_key, optional=True)
params = {}
if not self.user_api_key and query_api_key:
params['api_key'] = query_api_key
response = self.session.get(f'{self.base_url}/api/queries/{query_id}/results.csv',
params=params,
verify=self.verify)
if response.status_code != 200:
raise RedashQueryFailed(f'Failed getting results for query {query_id}. {response.text}')
return Table.from_csv_string(response.text)
[docs] @classmethod
def load_to_table(cls, refresh=True, **kwargs):
"""
Fast classmethod so you can get the data all at once:
tabledata = Redash.load_to_table(base_url='https://example.com', user_api_key='abc123',
query_id=1001, params={'datelimit': '2020-01-01'})
This instantiates the class and makes the appropriate query type (refresh or cached)
based on which arguments are supplied.
`Args:`
base_url: str
The base url for your redash instance (excluding the final /)
query_id: str or int
The query id of the query
user_api_key: str
The user API key found in the User's profile screen required for refresh queries
query_api_key: str
If you did not supply a user_api_key on the Redash object, then you can
supply a query_api_key to get cached results back anonymously.
pause_time int
Specify time between polling for refreshed queries (Defaults to 3 seconds)
verify: bool
For https requests, should the certificate be verified (Defaults to True)
refresh: bool
Refresh results or cached. (Defaults to True unless a query_api_key IS supplied)
params: dict
For refresh queries, if there are parameters in the query,
then this is a dict that will pass the parameters in the POST.
We add the "p_" prefix for parameters, so if your query had ?p_datelimit=....
in the url, you should just set 'datelimit' in params here.
`Returns:`
Table Class
"""
initargs = {a: kwargs.get(a)
for a in ('base_url', 'user_api_key', 'pause_time', 'timeout', 'verify')
if a in kwargs}
obj = cls(**initargs)
if not refresh or kwargs.get('query_api_key'):
return obj.get_cached_query_results(kwargs.get('query_id'), kwargs.get('query_api_key'))
else:
return obj.get_fresh_query_results(kwargs.get('query_id'), kwargs.get('params'))