Source code for parsons.catalist.catalist

"""Utilities for working with the Catalist Match API

Install dependencies with `pip install parsons[catalist]`
"""

import base64
import logging
import os
import tempfile
import time
import urllib
from typing import Optional, Union, Dict, List
from zipfile import ZipFile

from parsons.etl import Table
from parsons.sftp import SFTP
from parsons.utilities.oauth_api_connector import OAuth2APIConnector

logger = logging.getLogger(__name__)


[docs]class CatalistMatch: """Connector for working with the Catalist Match API. This API allows a trusted third party to submit new files for processing, and/or reprocess existing files. It also allows retrieval of processing status. Initial setup of template(s) via the M Tool UI will be required. The Catalist Match tool requires OAuth2.0 client credentials for the API as well as credentials for accessing the Catalist sftp bucket. Each Catalist client is given their own bucket alias named after a tree species, used for constructing the filepath within the sftp bucket. Accessing the Catalist sftp bucket and Match API both require the source IP address to be explicitly white-listed by Catalist. Example usage: ``` tbl = Table.from_csv(...) client = CatalistMatch(...) match_result = client.match(tbl) ``` Note that matching can take from 10 minutes up to 6 hours or longer to complete, so you may want to think strategically about how to await completion without straining your compute resources on idling. To separate submitting the job and fetching the result: ``` tbl = Table.from_csv(...) client = CatalistMatch(...) response = client.upload(tbl) match_result = client.await_completion(response["id"]) ``` """ def __init__( self, client_id: str, client_secret: str, sftp_username: str, sftp_password: str, client_audience: Optional[str] = None, ) -> None: self.client_id = client_id self.client_secret = client_secret self.connection = OAuth2APIConnector( "https://api.catalist.us/mapi/", client_id=client_id, client_secret=client_secret, authorization_kwargs={"audience": client_audience or "catalist_api_m_prod"}, token_url="https://auth.catalist.us/oauth/token", auto_refresh_url="https://auth.catalist.us/oauth/token", ) self.sftp = SFTP("t.catalist.us", sftp_username, sftp_password)
[docs] def load_table_to_sftp( self, table: Table, input_subfolder: Optional[str] = None ) -> str: """Load table to Catalist sftp bucket as gzipped CSV for matching. If input_subfolder is specific, the file will be uploaded to a subfolder of the myUploads directory in the SFTP server. `Args:` table: Table Parsons Table for matching. "first_name" and "last_name" columns are required. Optional columns for matching: last_name, name_suffix, addr1, addr2, city, state, zip, phone, email, gender_tomatch, dob, dob_year, matchbackid. input_subfolder: str Optional. If specified, the file will be uploaded to a subfolder of the myUploads directory in the SFTP server. """ local_path = table.to_csv(temp_file_compression="gzip") hashed_name = hash(time.time()) remote_path_parts = ["myUploads", f"{hashed_name}.csv.gz"] if input_subfolder: if input_subfolder not in self.sftp.list_directory("/myUploads/"): self.sftp.make_directory("/myUploads/" + input_subfolder) remote_path_parts.insert(1, input_subfolder) remote_path = "/".join(remote_path_parts) self.sftp.put_file(local_path, remote_path) # Loads to Catalist SFTP bucket are expcted in the client's uploads bucket # So we don't need to explicitly include that part of the path result = f"file://{remote_path.replace('myUploads/', '')}" return result
[docs] def match( self, table: Table, export: bool = False, description: Optional[str] = None, export_filename_suffix: Optional[str] = None, input_subfolder: Optional[str] = None, copy_to_sandbox: bool = False, static_values: Optional[Dict[str, Union[str, int]]] = None, ) -> Table: """Load table to the Catalist Match API, returns matched table. This method blocks until the match completes, which can take from 10 minutes to 6 hours or more depending on concurrent traffic. `Args:` table: Table Parsons Table for matching. "first_name" and "last_name" columns are required. Optional columns for matching: last_name, name_suffix, addr1, addr2, city, state, zip, phone, email, gender_tomatch, dob, dob_year, matchbackid. export: bool Defaults to False description: str Optional. Description for the match job. export_filename_suffix: str Optional. Adds a suffix to the result filename in the SFTP server. input_subfolder: str Optional. Adds a prefix to the filepath of the uploaded input file in the SFTP server. copy_to_sandbox: bool Defaults to False. static_values: dict Optional. Any included values are mapped to every row of the input table. """ response = self.upload( table=table, export=export, description=description, export_filename_suffix=export_filename_suffix, input_subfolder=input_subfolder, copy_to_sandbox=copy_to_sandbox, static_values=static_values, ) result = self.await_completion(response["id"]) return result
[docs] def upload( self, table: Table, template_id: str = "48827", export: bool = False, description: Optional[str] = None, export_filename_suffix: Optional[str] = None, input_subfolder: Optional[str] = None, copy_to_sandbox: bool = False, static_values: Optional[Dict[str, Union[str, int]]] = None, ) -> dict: """Load table to the Catalist Match API, returns response with job metadata. `Args:` table: Table Parsons Table for matching. "first_name" and "last_name" columns are required. Optional columns for matching: last_name, name_suffix, addr1, addr2, city, state, zip, phone, email, gender_tomatch, dob, dob_year, matchbackid. template_id: str Defaults to 48827, currently the only available template for working with the Match API. export: bool Defaults to False description: str Optional. Description for the match job. export_filename_suffix: str Optional. Adds a suffix to the result filename in the SFTP server. input_subfolder: str Optional. Adds a prefix to the filepath of the uploaded input file in the SFTP server. copy_to_sandbox: bool Defaults to False. static_values: dict Optional. Any included values are mapped to every row of the input table. """ self.validate_table(table, template_id) # upload table to s3 temp location sftp_file_path = self.load_table_to_sftp(table, input_subfolder) sftp_file_path_encoded = base64.b64encode( sftp_file_path.encode("ascii") ).decode("ascii") if export: action = "export%2Cpublish" else: action = "publish" # Create endpoint using options endpoint_params = [ "upload", "template", template_id, "action", action, "url", sftp_file_path_encoded, ] if description: endpoint_params.extend(["description", description]) endpoint = "/".join(endpoint_params) # Assemble query parameters query_params: Dict[str, Union[str, int]] = { "token": self.connection.token["access_token"] } if copy_to_sandbox: query_params["copyToSandbox"] = "true" if static_values: query_params.update(static_values) if export_filename_suffix: query_params["subClientName"] = export_filename_suffix logger.debug(f"Executing request to endpoint {self.connection.uri + endpoint}") endpoint = endpoint + "?" + urllib.parse.urlencode(query_params) response = self.connection.get_request(endpoint) result = response[0] return result
[docs] def action( self, file_ids: Union[str, List[str]], match: bool = False, export: bool = False, export_filename_suffix: Optional[str] = None, copy_to_sandbox: bool = False, ) -> List[dict]: """Perform actions on existing files. All files must be in Finished status (if the action requested is publish), and must mapped against the same template. The request will return as soon as the action has been queued. `Args:` file_ids: str or List[str] one or more file_ids (found in the `id` key of responses from the upload() or status() methods) match: bool Optional. Defaults to False. If True, will initiate matching. export: bool Optional. Defaults to False. If True, will initiate export. export_filename_suffix: str Optional. If included, adds a suffix to the filepath of the exported file in the SFTP server. copy_to_sandbox: bool Defaults to False. """ actions = ["publish"] if match: actions.append("match") if export: actions.append("export") action = urllib.parse.quote(",".join(actions)) endpoint_params = ["action", action] if isinstance(file_ids, list): encoded_files = urllib.parse.quote(",".join(file_ids)) else: encoded_files = file_ids endpoint_params.extend(["file", encoded_files]) endpoint = "/".join(endpoint_params) logger.debug(f"Executing request to endpoint {self.connection.uri + endpoint}") query_params = {"token": self.connection.token["access_token"]} if copy_to_sandbox: query_params["copyToSandbox"] = "true" if export_filename_suffix: query_params["subClientName"] = export_filename_suffix endpoint = endpoint + "?" + urllib.parse.urlencode(query_params) result = self.connection.get_request(endpoint) return result
[docs] def status(self, id: str) -> dict: """Check status of a match job.""" endpoint = "/".join(["status", "id", id]) query_params = {"token": self.connection.token["access_token"]} result = self.connection.get_request(endpoint, params=query_params) return result
[docs] def await_completion(self, id: str, wait: int = 30) -> Table: """Await completion of a match job. Return matches when ready. This method will poll the status of a match job on a timer until the job is complete. By default, polls once every 30 seconds. Note that match job completion can take from 10 minutes up to 6 hours or more depending on concurrent traffic. Consider your strategy for polling for completion.""" while True: response = self.status(id) status = response["process"]["processState"] if status in ("Finished", "Error", "Stopped", "Exception"): logger.info(f"Job {id} is complete with status {status}.") break logger.info(f"Job {id} has status {status}, awaiting completion.") time.sleep(wait) result = self.load_matches(id) return result
[docs] def load_matches(self, id: str) -> Table: """Take a completed job ID, download and open the match file as a Table. Result will be a Table with all the original columns along with columns 'DWID', 'CONFIDENCE', 'ZIP9', and 'STATE'. The original column headers will be prepended with 'COL#-'.""" # Validate that the job is complete response = self.status(str(id)) status = response["process"]["processState"] if status == "Finished": logger.info(f"Validated that job {id} completed successfully.") else: err_msg = "Failed to successfully run match job. " if status == "Error": err_msg += "Internal error. " elif status == "Stopped": err_msg += "Probably stopped by Catalist staff. Will be rerun. " elif status == "Exception": err_msg += ( "Error with data. Catalist will have been notified and " "will contact you or rerun the file. " ) else: "Unknown or unexpected final status." err_msg += f"[job={id}, final_status={status}]" raise RuntimeError(err_msg) remote_filepaths = self.sftp.list_directory("/myDownloads/") remote_filename = [filename for filename in remote_filepaths if id in filename][ 0 ] remote_filepath = "/myDownloads/" + remote_filename temp_file_zip = self.sftp.get_file(remote_filepath) temp_dir = tempfile.mkdtemp() with ZipFile(temp_file_zip) as zf: zf.extractall(path=temp_dir) filepath = os.listdir(temp_dir)[0] result = Table.from_csv(os.path.join(temp_dir, filepath), delimiter="\t") return result
[docs] def validate_table(self, table: Table, template_id: str = "48827") -> None: """Validate table structure and contents.""" if not template_id == "48827": logger.warn(f"No validator implemented for template {template_id}.") return expected_table_columns = [ "first_name", "middle_name", "last_name", "name_suffix", "addr1", "addr2", "city", "state", "zip", "phone", "email", "gender_tomatch", "dob", "dob_year", "matchbackid", ] required_columns: List[str] = ["first_name", "last_name"] actual_table_columns = table.columns unexpected_columns = [ col for col in actual_table_columns if col not in expected_table_columns ] missing_required_columns = [ col for col in required_columns if col not in actual_table_columns ] errors = {} if unexpected_columns: errors["unexpected_columns"] = unexpected_columns if missing_required_columns: errors["missing_required_columns"] = missing_required_columns if errors: raise ValueError( "Input table does not have the right structure. %s", errors ) else: logger.info("Table structure validated.")