Utilities

API Connector

class parsons.utilities.api_connector.APIConnector(uri, headers=None, auth=None, pagination_key=None, data_key=None)[source]

The API Connector is a low level class for API requests that other connectors can utilize. It is understood that there are many standards for REST APIs and it will be difficult to create a universal connector. The goal of this class is create series of utilities that can be mixed and matched to, hopefully, meet the needs of the specific API.

Parameters:
  • uri – str The base uri for the api. Must include a trailing ‘/’ (e.g. http://myapi.com/v1/)

  • headers – dict The request headers

  • auth – dict The request authorization parameters

  • pagination_key – str The name of the key in the response json where the pagination url is located. Required for pagination.

  • data_key – str The name of the key in the response json where the data is contained. Required if the data is nested in the response json

Returns:

APIConnector class

request(url, req_type: Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], json=None, data=None, params=None)[source]

Base request using requests libary.

Parameters:
  • url – str The url request string; if url is a relative URL, it will be joined with the uri of the APIConnector`; if ``url is an absolute URL, it will be used as is.

  • req_type – str The request type. One of GET, POST, PUT, PATCH, DELETE, OPTIONS

  • json – dict The payload of the request object. By using json, it will automatically serialize the dictionary

  • data – str or byte or dict The payload of the request object. Use instead of json in some instances.

  • params – dict The parameters to append to the url (e.g. http://myapi.com/things?id=1)

  • raise_on_error – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be True, however in some cases, if you are looping through data, you might want to ignore individual failures.

Returns:

requests response

get_request(url, params=None, return_format='json')[source]

Make a GET request.

Parameters:
  • url – str A complete and valid url for the api request

  • params – dict The request parameters

Returns:

A requests response object

post_request(url, params=None, data=None, json=None, success_codes=None)[source]

Make a POST request.

Parameters:
  • url – str A complete and valid url for the api request

  • params – dict The request parameters

  • data – str or file A data object to post

  • json – dict A JSON object to post

  • success_codes – int The expected success code to be returned. If not provided, accepts 200, 201, 202, and 204.

Returns:

A requests response object

delete_request(url, params=None, success_codes=None)[source]

Make a DELETE request.

Parameters:
  • url – str A complete and valid url for the api request

  • params – dict The request parameters

  • success_codes – int The expected success codes to be returned. If not provided, accepts 200, 201, 204.

Returns:

A requests response object or status code

put_request(url, data=None, json=None, params=None, success_codes=None)[source]

Make a PUT request.

Parameters:
  • url – str A complete and valid url for the api request

  • data – str or file A data object to post

  • json – dict A JSON object to post

  • params – dict The request parameters

  • success_codes – int The expected success codes to be returned. If not provided, accepts 200, 201, 204.

Returns:

A requests response object

patch_request(url, params=None, data=None, json=None, success_codes=None)[source]

Make a PATCH request.

Parameters:
  • url – str A complete and valid url for the api request

  • params – dict The request parameters

  • data – str or file A data object to post

  • json – dict A JSON object to post

  • success_codes – int The expected success codes to be returned. If not provided, accepts 200, 201, and 204.

Returns:

A requests response object

validate_response(resp)[source]

Validate that the response is not an error code. If it is, then raise an error and display the error message.

Parameters:

resp – object A response object

data_parse(resp)[source]

Determines if the response json has nested data. If it is nested, it just returns the data. This is useful in dealing with requests that might return multiple records, while others might return only a single record.

Parameters:

resp – A response dictionary

Returns:

dict

A dictionary of data.

next_page_check_url(resp)[source]

Check to determine if there is a next page. This requires that the response json contains a pagination key that is empty if there is not a next page.

Parameters:

resp – A response dictionary

`Returns:

boolean

json_check(resp)[source]

Check to see if a response has a json included in it.

convert_to_table(data)[source]

Internal method to create a Parsons table from a data element.

Check ENV

parsons.utilities.check_env.check(env: str, field: str | None, optional: bool | None = False) str | None[source]

Check if an environment variable has been set. If it has not been set and the passed field or arguments have not been passed, then raise an error.

Cloud Storage

The Parsons cloud storage utility was created to interact with APIs that require access to files to run an asynchronous process.

The cloud storage utility is currently being utilitized primarily by the NGPVAN class methods such as upload_scores() and upload_saved_list().

These methods have arguments specific their method, but all also contain the following cloud storage arguments:

  • url_type - The type of cloud storage to utilize. Currently S3 or GCS.

  • **url_kwargs - These are arguments specific to the cloud storage type in order to initialize. They are listed below based on the url type.

The file will then be converted to a CSV, compressed and posted to the cloud storage. A presigned url will be generated and active by default for 60 minutes, but you can adjust the time.

parsons.utilities.cloud_storage.post_file(tbl, type: Literal['S3', 'GCS'], file_path=None, quoting=0, **file_storage_args)[source]

This utility method is a generalizable method for moving files to an online file storage class. It is used by methods that require access to a file via a public url (e.g. VAN).

S3 is the only option allowed.

Parameters:
  • tbl – object parsons.Table

  • type – str S3 or GCS (Google Cloud Storage)

  • file_path – str The file path to store the file. Not required if provided with the **file_storage_args.

  • quoting – attr The type of quoting to use for the csv.

  • **file_storage_args – kwargs Optional arguments specific to the file storage.

Returns:

None

Amazon S3

Below are the required and optional arguments utilizing Amazon S3 as the cloud storage service:

Argument

Required

Description

bucket

Yes

The S3 bucket to post the file

aws_access_key

No

Required if AWS_ACCESS_KEY_ID env variable not set.

aws_secret_access_key

No

Required if AWS_SECRET_ACCESS_KEY env variable not set.

public_url_expires

No

Defaults is 60 minutes.

Google Cloud Storage

Below are the required and optional arguments utilizing Google Cloud Storage as the cloud storage service:

Argument

Required

Description

bucket

Yes

The S3 bucket to post the file

app_creds

No

Required if GOOGLE_APPLICATION_CREDENTIALS env variable not set.

public_url_expire

No

Defaults is 60 minutes.

Credential Tools

parsons.utilities.credential_tools.decode_credential(credential, save_path=None, export=True, echo=False)[source]

Decode an encoded credential to a Python object.

Parameters:
  • credential – str An encoded credential.

  • save_path – str The path for where to save the decoded credential.

  • export – bool A flag for whether to export the decoded object to the environment. Defaults to true.

  • echo – bool A flag for whether to print the decoded object. Defaults to False.

Returns:

dict

The decoded object.

parsons.utilities.credential_tools.encode_from_json_str(credential)[source]

Encode credential(s) from a json string.

Parameters:

credential – str The credential json string to be encoded.

Returns:

str

The encoded credential.

parsons.utilities.credential_tools.encode_from_json_file(credential_file)[source]

Encode credential(s) from a json file.

Parameters:

credential_file – str The path to the json file with the credential to be encoded.

Returns:

str

The encoded credential.

parsons.utilities.credential_tools.encode_from_env(env_variables)[source]

Encode credential(s) from the current environment.

Parameters:

env_variables – list The list of credentials from the environment to be encoded.

Returns:

str

The encoded credential.

parsons.utilities.credential_tools.encode_from_dict(credential)[source]

Encode credential(s) from a dictionary.

Parameters:

credential – dict The list of credentials from the environment to be encoded.

Returns:

str

The encoded credential.

Datetime

parsons.utilities.datetime.date_to_timestamp(value, tzinfo=datetime.timezone.utc)[source]

Convert any date value into a Unix timestamp.

Parameters:
  • value – int or str or datetime Value to parse

  • tzinfo – datetime.timezone Optional: Timezone for the datetime; defaults to UTC.

Returns:

Unix timestamp (int)

parsons.utilities.datetime.convert_unix_to_readable(ts)[source]

Converts UNIX timestamps to readable timestamps.

parsons.utilities.datetime.parse_date(value: int | str | datetime, tzinfo=datetime.timezone.utc)[source]

Parse an arbitrary date value into a Python datetime.

If no value is provided (i.e., the value is None or empty), then the return value will be None.

Parameters:
  • value – int or str or datetime Value to parse

  • tzinfo – datetime.timezone Optional: Timezone for the datetime; defaults to UTC.

Returns:

datetime.datetime or None

Files

parsons.utilities.files.create_temp_file(suffix=None)[source]

Create a temp file that will exist as long as the current script is running.

Parameters:

suffix – str A suffix/extension to add to the end of the temp file name

Returns:

str

The path of the temp file

parsons.utilities.files.create_temp_file_for_path(path)[source]

Creates a temp file that will exist as long as the current script is running, and with a file name mimicking that of the provided path.

Parameters:

path – str Path (or just file name) of the file you want the temp file to mimick.

Returns:

str

The path of the temp file

parsons.utilities.files.string_to_temp_file(string, suffix=None)[source]

Create a temporary file from a string. Currently used for packages that require credentials to be stored as a file.

Format Phone Number

parsons.utilities.format_phone_number.format_phone_number(phone_number, country_code='1')[source]

Formats a phone number in E.164 format, which is the international standard for phone numbers. Example: Converts “555-555-5555” -> “+15555555555”

Parameters:
  • phone_number (str) – The phone number to be formatted.

  • country_code (str) – The country code to be used as a prefix.

  • "1" (Defaults to)

Returns:

The formatted phone number in E.164 format.

Return type:

str

Format JSON

parsons.utilities.json_format.arg_format(arg)[source]

Many APIs require arguments to formatted like this ‘thisTypeConfig’ which is not the standard for python so this method takes an argument ‘this_type_config’ and returns it as ‘thisTypeConfig’

parsons.utilities.json_format.remove_empty_keys(dirty_dict)[source]

Remove empty keys from a dictionary. This method is useful when passing jsons in which a null field will update the value to null and you don’t want that.

parsons.utilities.json_format.flatten_json(json)[source]

Flatten nested json to return a dict without nested values. Lists without nested values will be ignored, and lists of dicts will only return the first key value pair for each key. Useful for passing nested json to validation methods.

OAuth API Connector

class parsons.utilities.oauth_api_connector.OAuth2APIConnector(uri: str, client_id: str, client_secret: str, token_url: str, auto_refresh_url: str | None, headers: dict[str, str] | None = None, pagination_key: str | None = None, data_key: str | None = None, grant_type: str = 'client_credentials', authorization_kwargs: dict[str, str] | None = None)[source]

The OAuth2API Connector is a low level class for authenticated API requests using OAuth2. It extends APIConnector by wrapping the request methods in a server-side OAuth2 client and otherwise provides the same interface as APIConnector.

Parameters:
  • uri – str The base uri for the api. Must include a trailing ‘/’ (e.g. http://myapi.com/v1/)

  • client_id – str The client id for acquiring and exchanging tokens from the OAuth2 application

  • client_secret – str The client secret for acquiring and exchanging tokens from the OAuth2 application

  • token_url – str The URL for acquiring new tokens from the OAuth2 Application

  • auto_refresh_url – str If provided, the URL for refreshing tokens from the OAuth2 Application

  • headers – dict The request headers

  • pagination_key – str The name of the key in the response json where the pagination url is located. Required for pagination.

  • data_key – str The name of the key in the response json where the data is contained. Required if the data is nested in the response json

Returns:

OAuthAPIConnector class

request(url, req_type: Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], json=None, data=None, params=None)[source]

Base request using requests libary.

Parameters:
  • url – str The url request string; if url is a relative URL, it will be joined with the uri of the OAuthAPIConnector`; if ``url is an absolute URL, it will be used as is.

  • req_type – str The request type. One of GET, POST, PUT, PATCH, DELETE, OPTIONS

  • json – dict The payload of the request object. By using json, it will automatically serialize the dictionary

  • data – str or byte or dict The payload of the request object. Use instead of json in some instances.

  • params – dict The parameters to append to the url (e.g. http://myapi.com/things?id=1)

Returns:

requests response

convert_to_table(data)

Internal method to create a Parsons table from a data element.

data_parse(resp)

Determines if the response json has nested data. If it is nested, it just returns the data. This is useful in dealing with requests that might return multiple records, while others might return only a single record.

Parameters:

resp – A response dictionary

Returns:

dict

A dictionary of data.

delete_request(url, params=None, success_codes=None)

Make a DELETE request.

Parameters:
  • url – str A complete and valid url for the api request

  • params – dict The request parameters

  • success_codes – int The expected success codes to be returned. If not provided, accepts 200, 201, 204.

Returns:

A requests response object or status code

get_request(url, params=None, return_format='json')

Make a GET request.

Parameters:
  • url – str A complete and valid url for the api request

  • params – dict The request parameters

Returns:

A requests response object

json_check(resp)

Check to see if a response has a json included in it.

next_page_check_url(resp)

Check to determine if there is a next page. This requires that the response json contains a pagination key that is empty if there is not a next page.

Parameters:

resp – A response dictionary

`Returns:

boolean

patch_request(url, params=None, data=None, json=None, success_codes=None)

Make a PATCH request.

Parameters:
  • url – str A complete and valid url for the api request

  • params – dict The request parameters

  • data – str or file A data object to post

  • json – dict A JSON object to post

  • success_codes – int The expected success codes to be returned. If not provided, accepts 200, 201, and 204.

Returns:

A requests response object

post_request(url, params=None, data=None, json=None, success_codes=None)

Make a POST request.

Parameters:
  • url – str A complete and valid url for the api request

  • params – dict The request parameters

  • data – str or file A data object to post

  • json – dict A JSON object to post

  • success_codes – int The expected success code to be returned. If not provided, accepts 200, 201, 202, and 204.

Returns:

A requests response object

put_request(url, data=None, json=None, params=None, success_codes=None)

Make a PUT request.

Parameters:
  • url – str A complete and valid url for the api request

  • data – str or file A data object to post

  • json – dict A JSON object to post

  • params – dict The request parameters

  • success_codes – int The expected success codes to be returned. If not provided, accepts 200, 201, 204.

Returns:

A requests response object

validate_response(resp)

Validate that the response is not an error code. If it is, then raise an error and display the error message.

Parameters:

resp – object A response object

SQL Helpers

parsons.utilities.sql_helpers.redact_credentials(sql)[source]

Redact any credentials explicitly represented in SQL (e.g. COPY statement)

SSH Utilities

parsons.utilities.ssh_utilities.query_through_ssh(ssh_host, ssh_port, ssh_username, ssh_password, db_host, db_port, db_name, db_username, db_password, query)[source]
Parameters:
  • ssh_host – The host for the SSH connection

  • ssh_port – The port for the SSH connection

  • ssh_username – The username for the SSH connection

  • ssh_password – The password for the SSH connection

  • db_host – The host for the db connection

  • db_port – The port for the db connection

  • db_name – The name of the db database

  • db_username – The username for the db database

  • db_password – The password for the db database

  • query – The SQL query to execute

Returns:

A list of records resulting from the query or None if something went wrong

ZIP Archive

parsons.utilities.zip_archive.create_archive(archive_path, file_path, file_name=None, if_exists: Literal['replace', 'append'] = 'replace')[source]

Create and fill an archive.

Parameters:
  • archive_path – str The file name of zip archive

  • file_path – str The path of the file

  • file_name – str The name of the file in the archive

  • if_exists – str If archive already exists, one of ‘replace’ or ‘append’

Returns:

Zip archive path

parsons.utilities.zip_archive.unzip_archive(archive_path, destination=None)[source]

Unzip an archive. Only returns the path of the first file in the archive.

Parameters:
  • archive_path – str Path to the ZIP archive

  • destination – str Optional; path to unzip the archive into; if not specified, the

Returns:

Extracted file path.

dbt Utilities

Core methods for running dbt commands.

parsons.utilities.dbt.dbt.run_dbt_commands(commands: str | list[str], dbt_project_directory: Path, dbt_profile_directory: Path | None = None, loggers: list[dbtLogger | type[dbtLogger]] | None = None) list[Manifest][source]

Executes dbt commands within a directory, optionally logs results.

from pathlib import Path
from parsons.utilities.dbt import (
    run_dbt_commands,
    dbtLoggerSlack,
    dbtLoggerPython
)
results = run_dbt_commands(
    commands=["dbt run", "dbt test"],
    dbt_project_directory=Path("/path/to/dbt/project"),
    loggers=[dbtLoggerPython, dbtLoggerSlack]
)
Parameters:
  • commands – Union[str, list[str]] A single dbt command as a string or a list of dbt commands to be executed.

  • dbt_project_directory – Path The path to the dbt project directory where the commands will be executed.

  • dbt_profile_directory – Path, optional The path to find the dbt profile

  • loggers – Optional[list[Union[dbtLogger, Type[dbtLogger]]]], default=None A list of logger instances or logger classes. If classes are provided, they will be instantiated. Each logger should have a send method that takes the dbt command results as an argument.

Returns:

list[Manifest]

A list of result objects from the executed dbt commands.

Logging classes for use with Parsons dbt utility.

class parsons.utilities.dbt.logging.dbtLogger[source]

Abstract base class for aggregating logs from dbt commands.

abstractmethod send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

class parsons.utilities.dbt.logging.dbtLoggerMarkdown[source]

Formats dbt results into a structured Markdown summary.

format_result() str[source]

Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.

abstractmethod send(manifests: list[Manifest]) None

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

class parsons.utilities.dbt.logging.dbtLoggerStdout[source]
send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

format_result() str

Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.

class parsons.utilities.dbt.logging.dbtLoggerPython[source]
send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

format_result() str

Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.

class parsons.utilities.dbt.logging.dbtLoggerSlack(slack_webhook: str, slack_channel: str | None = None)[source]
send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

format_result() str

Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.

class parsons.utilities.dbt.logging.dbtLoggerDatabase(database_connector: DatabaseConnector, destination_table_runs: str, destination_table_nodes: str, extra_run_table_fields: dict, **copy_kwargs)[source]

Log dbt artifacts by loading to a database.

This class is an abstract base class for logging dbt artifacts to a database.

format_command_result(manifest: Manifest) tuple[Table, Table][source]

Loads all artifact results into a Parsons Table.

format_result() tuple[Table, Table][source]

Returns a table for the dbt runs and a table for the node runs.

send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

Pydantic data models for use with dbt utilities.

class parsons.utilities.dbt.models.Manifest(command: str, run_execution_result: RunExecutionResult | None = None, *, dbt_manifest: RunExecutionResult | None = None)[source]

A wrapper for dbt execution results.

filter_results(**kwargs) list[NodeResult][source]

Subset of NodeResults based on filter.

property dbt_manifest: RunExecutionResult

Legacy proxy to new attribute.

property overall_status: NodeStatus

Determine the overall state of the command.

Returns a member of the NodeStatus Enum: Error, Warn, Skipped, or Success.

property skips: list[NodeResult]

Returns skipped model builds but not skipped tests.

property summary: Counter

Aggregates all node outcomes into a count of status strings.

property total_gb_processed: float

Total GB processed by full dbt command run.

property total_slot_hours: float

Total slot hours used by full dbt command run.

class parsons.utilities.dbt.models.EnhancedNodeResult(status: RunStatus | TestStatus | FreshnessStatus, timing: List[TimingInfo], thread_id: str, execution_time: float, adapter_response: Dict[str, Any], message: str | None, failures: int | None, node: AnalysisNode | FunctionNode | SingularTestNode | HookNode | ModelNode | SqlNode | GenericTestNode | SnapshotNode | UnitTestNode | SeedNode | SourceDefinition)[source]
log_message() str | None[source]

Helper method to generate message for logs.