import json
import logging
import requests
import time
from parsons.etl.table import Table
from parsons.utilities.check_env import check
logger = logging.getLogger(__name__)
class RedashTimeout(Exception):
pass
class RedashQueryFailed(Exception):
pass
[docs]class Redash(object):
"""
Instantiate Redash Class
`Args:`
base_url: str
The base url for your redash instance (excluding the final /)
user_api_key: str
The user API key found in the User's profile screen
pause_time int
Specify time between polling for refreshed queries (Defaults to 3 seconds)
verify: bool
For https requests, should the certificate be verified (Defaults to True)
`Returns:`
Redash Class
"""
def __init__(
self,
base_url=None,
user_api_key=None,
pause_time=3,
timeout=0, # never timeout
verify=True,
):
self.base_url = check("REDASH_BASE_URL", base_url)
self.user_api_key = check("REDASH_USER_API_KEY", user_api_key, optional=True)
self.pause = int(check("REDASH_PAUSE_TIME", pause_time, optional=True))
self.timeout = int(check("REDASH_TIMEOUT", timeout, optional=True))
self.verify = verify # for https requests
self.session = requests.Session()
if user_api_key:
self.session.headers.update({"Authorization": f"Key {user_api_key}"})
def _catch_runtime_error(self, res):
if res.status_code != 200:
raise RuntimeError(f"Error. Status code: {res.status_code}. Reason: {res.reason}")
def _poll_job(self, session, job, query_id):
start_secs = time.time()
while job["status"] not in (3, 4):
if self.timeout and start_secs + self.timeout < time.time():
raise RedashTimeout(f"Redash timeout: {self.timeout}")
poll_url = "{}/api/jobs/{}".format(self.base_url, job["id"])
response = session.get(poll_url, verify=self.verify)
response_json = response.json()
job = response_json.get(
"job",
{"status": "Error NO JOB IN RESPONSE: {}".format(json.dumps(response_json))},
)
logger.debug(
"poll url:%s id:%s status:%s err:%s",
poll_url,
query_id,
job["status"],
job.get("error"),
)
time.sleep(self.pause)
if job["status"] == 3: # 3 = completed
return job["query_result_id"]
elif job["status"] == 4: # 3 = ERROR
raise RedashQueryFailed("Redash Query {} failed: {}".format(query_id, job["error"]))
[docs] def get_data_source(self, data_source_id):
"""
Get a data source.
`Args:`
data_source_id: int or str
ID of data source.
`Returns`:
Data source json object
"""
res = self.session.get(f"{self.base_url}/api/data_sources/{data_source_id}")
self._catch_runtime_error(res)
return res.json()
[docs] def update_data_source(self, data_source_id, name, type, dbName, host, password, port, user):
"""
Update a data source.
`Args:`
data_source_id: str or int
ID of data source.
name: str
Name of data source.
type: str
Type of data source.
dbname: str
Database name of data source.
host: str
Host of data source.
password: str
Password of data source.
port: int or str
Port of data source.
user: str
Username of data source.
`Returns:`
``None``
"""
self._catch_runtime_error(
self.session.post(
f"{self.base_url}/api/data_sources/{data_source_id}",
json={
"name": name,
"type": type,
"options": {
"dbname": dbName,
"host": host,
"password": password,
"port": port,
"user": user,
},
},
)
)
[docs] def get_fresh_query_results(self, query_id=None, params=None):
"""
Make a fresh query result and get back the CSV http response object back
with the CSV string in result.content
`Args:`
query_id: str or int
The query id of the query
params: dict
If there are values for the redash query parameters
(described https://redash.io/help/user-guide/querying/query-parameters
e.g. "{{datelimit}}" in the query),
then this is a dict that will pass the parameters in the POST.
We add the "p_" prefix for parameters, so if your query had ?p_datelimit=....
in the url, you should just set 'datelimit' in params here.
If you set this with REDASH_QUERY_PARAMS environment variable instead of passing
the values, then you must include the "p_" prefixes and it should be a single
url-encoded string as you would see it in the URL bar.
`Returns:`
Table Class
"""
query_id = check("REDASH_QUERY_ID", query_id, optional=True)
params_from_env = check("REDASH_QUERY_PARAMS", "", optional=True)
redash_params = (
{"p_%s" % k: str(v).replace("'", "''") for k, v in params.items()} if params else {}
)
response = self.session.post(
f"{self.base_url}/api/queries/{query_id}/refresh?{params_from_env}",
params=redash_params,
verify=self.verify,
)
if response.status_code != 200:
raise RedashQueryFailed(f"Refresh failed for query {query_id}. {response.text}")
job = response.json()["job"]
result_id = self._poll_job(self.session, job, query_id)
if result_id:
response = self.session.get(
f"{self.base_url}/api/queries/{query_id}/results/{result_id}.csv",
verify=self.verify,
)
if response.status_code != 200:
raise RedashQueryFailed(
f"Failed getting results for query {query_id}. {response.text}"
)
else:
raise RedashQueryFailed(f"Failed getting result {query_id}. {response.text}")
return Table.from_csv_string(response.text)
[docs] def get_cached_query_results(self, query_id=None, query_api_key=None):
"""
Get the results from a cached query result and get back the CSV http response object back
with the CSV string in result.content
`Args:`
query_id: str or int
The query id of the query
query_api_key: str
If you did not supply a user_api_key on the Redash object, then you can
supply a query_api_key to get cached results back anonymously.
`Returns:`
Table Class
"""
query_id = check("REDASH_QUERY_ID", query_id)
query_api_key = check("REDASH_QUERY_API_KEY", query_api_key, optional=True)
params = {}
if not self.user_api_key and query_api_key:
params["api_key"] = query_api_key
response = self.session.get(
f"{self.base_url}/api/queries/{query_id}/results.csv",
params=params,
verify=self.verify,
)
if response.status_code != 200:
raise RedashQueryFailed(f"Failed getting results for query {query_id}. {response.text}")
return Table.from_csv_string(response.text)
[docs] @classmethod
def load_to_table(cls, refresh=True, **kwargs):
"""
Fast classmethod makes the appropriate query type (refresh or cached)
based on which arguments are supplied.
`Args:`
base_url: str
The base url for your redash instance (excluding the final /)
query_id: str or int
The query id of the query
user_api_key: str
The user API key found in the User's profile screen required for refresh queries
query_api_key: str
If you did not supply a user_api_key on the Redash object, then you can
supply a query_api_key to get cached results back anonymously.
pause_time int
Specify time between polling for refreshed queries (Defaults to 3 seconds)
verify: bool
For https requests, should the certificate be verified (Defaults to True)
refresh: bool
Refresh results or cached. (Defaults to True unless a query_api_key IS supplied)
params: dict
For refresh queries, if there are parameters in the query,
then this is a dict that will pass the parameters in the POST.
We add the "p_" prefix for parameters, so if your query had ?p_datelimit=....
in the url, you should just set 'datelimit' in params here.
`Returns:`
Table Class
"""
initargs = {
a: kwargs.get(a)
for a in ("base_url", "user_api_key", "pause_time", "timeout", "verify")
if a in kwargs
}
obj = cls(**initargs)
if not refresh or kwargs.get("query_api_key"):
return obj.get_cached_query_results(kwargs.get("query_id"), kwargs.get("query_api_key"))
else:
return obj.get_fresh_query_results(kwargs.get("query_id"), kwargs.get("params"))