import gzip
import json
import logging
import time

import requests
from parsons.etl.table import Table
from parsons.utilities import check_env

logger = logging.getLogger(__name__)

[docs]class Auth0(object): """ Instantiate the Auth0 class `Args:` client_id: str The Auth0 client ID. Not required if ``AUTH0_CLIENT_ID`` env variable set. client_secret: str The Auth0 client secret. Not required if ``AUTH0_CLIENT_SECRET`` env variable set. domain: str The Auth0 domain. Not required if ``AUTH0_DOMAIN`` env variable set. `Returns:` Auth0 Class """ def __init__(self, client_id=None, client_secret=None, domain=None): self.base_url = f"https://{check_env.check('AUTH0_DOMAIN', domain)}" access_token = ( f"{self.base_url}/oauth/token", data={ "grant_type": "client_credentials", # OAuth 2.0 flow to use "client_id": check_env.check("AUTH0_CLIENT_ID", client_id), "client_secret": check_env.check("AUTH0_CLIENT_SECRET", client_secret), "audience": f"{self.base_url}/api/v2/", }, ) .json() .get("access_token") ) self.headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", }
[docs] def delete_user(self, id): """ Delete Auth0 user. `Args:` id: str The user ID of the record to delete. `Returns:` int """ return requests.delete( f"{self.base_url}/api/v2/users/{id}", headers=self.headers ).status_code
[docs] def get_users_by_email(self, email): """ Get Auth0 users by email. `Args:` email: str The user email of the record to get. `Returns:` Table Class """ url = f"{self.base_url}/api/v2/users-by-email" val = requests.get(url, headers=self.headers, params={"email": email}) if val.status_code == 429: raise requests.exceptions.ConnectionError(val.json()["message"]) return Table(val.json())
[docs] def upsert_user( self, email, username=None, given_name=None, family_name=None, app_metadata={}, user_metadata={}, connection="Username-Password-Authentication", ): """ Upsert Auth0 users by email. `Args:` email: str The user email of the record to get. username: optional str Username to set for user given_name: optional str Given to set for user family_name: optional str Family name to set for user app_metadata: optional dict App metadata to set for user user_metadata: optional dict User metadata to set for user `Returns:` Requests Response object """ obj = { "email": email.lower(), "username": username, "connection": connection, "app_metadata": app_metadata, "blocked": False, "user_metadata": user_metadata, } if given_name is not None: obj["given_name"] = given_name if family_name is not None: obj["family_name"] = family_name payload = json.dumps(obj) existing = self.get_users_by_email(email.lower()) if existing.num_rows > 0: a0id = existing[0]["user_id"] ret = requests.patch( f"{self.base_url}/api/v2/users/{a0id}", headers=self.headers, data=payload, ) else: ret ="{self.base_url}/api/v2/users", headers=self.headers, data=payload) if ret.status_code != 200: raise ValueError(f"Invalid response {ret.json()}") return ret
[docs] def block_user(self, user_id, connection="Username-Password-Authentication"): """ Blocks Auth0 users by email - setting the "blocked" attribute on Auth0's API. `Args:` user_id: str Auth0 user id connection: optional str Name of auth0 connection (default to Username-Password-Authentication) `Returns:` Requests Response object """ payload = json.dumps({"connection": connection, "blocked": True}) ret = requests.patch( f"{self.base_url}/api/v2/users/{user_id}", headers=self.headers, data=payload, ) if ret.status_code != 200: raise ValueError(f"Invalid response {ret.json()}") return ret
[docs] def retrieve_all_users(self, connection="Username-Password-Authentication"): """ Retrieves all Auth0 users using the batch jobs endpoint. `Args:` connection: optional str Name of auth0 connection (default to Username-Password-Authentication) `Returns:` Requests Response object """ connection_id = self.get_connection_id(connection) url = f"{self.base_url}/api/v2/jobs/users-exports" headers = self.headers fields = [ {"name": n} for n in ["user_id", "username", "email", "user_metadata", "app_metadata"] ] # Start the users-export job response = url, headers=headers, json={"connection_id": connection_id, "format": "json", "fields": fields}, ) job_id = response.json().get("id") if job_id: # Check job status until complete while True: status_response = requests.get( f"{self.base_url}/api/v2/jobs/{job_id}", headers=headers ) status_data = status_response.json() if status_response.status_code == 429: time.sleep(10) elif status_response.status_code != 200: break elif status_data.get("status") == "completed": download_url = status_data.get("location") break elif status_data.get("status") == "failed": logger.error("Retrieve members job failed to complete.") return None # Download the users-export file users_response = requests.get(download_url) decompressed_data = gzip.decompress(users_response.content).decode("utf-8") users_data = [] for d in decompressed_data.split("\n"): if d: users_data.append(json.loads(d)) return Table(users_data) logger.error("Retrieve members job creation failed") return None
[docs] def get_connection_id(self, connection_name): """ Retrieves an Auth0 connection_id corresponding to a specific connection name `Args:` connection_name: str Name of auth0 connection `Returns:` Connection ID string """ url = f"{self.base_url}/api/v2/connections" response = requests.get(url, headers=self.headers) connections = response.json() for connection in connections: if connection["name"] == connection_name: return connection["id"] return None