Utilities

API Connector

class parsons.utilities.api_connector.APIConnector(uri: str, headers: Mapping[str, str | bytes | None] | None = None, auth: tuple[str, str] | AuthBase | Callable[[PreparedRequest], PreparedRequest] | None = None, pagination_key: str | None = None, data_key: str | None = None)[source]

Low level class for API requests that other connectors can utilize.

It is understood that there are many standards for REST APIs and it will be difficult to create a universal connector. The goal of this class is create series of utilities that can be mixed and matched to, hopefully, meet the needs of the specific API.

Parameters:
request(url: str, req_type: Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], *, json: Any | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, raise_on_error: bool = True, **kwargs) Response[source]

Base request using requests libary.

Parameters:
Return type:

Response

get_request(url: ..., *, params: ... = None, return_format: Literal['json'] = 'json', raise_on_error: ... = True, **kwargs) dict[str, Any][source]
get_request(url: ..., *, params: ... = None, return_format: Literal['content'], raise_on_error: ... = True, **kwargs) bytes

Make a GET request.

Parameters:
  • url – A complete and valid url for the api request.

  • params – The request parameters.

  • raise_on_error – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be True, however in some cases, if you are looping through data, you might want to ignore individual failures.

  • **kwargs – Additional keyword arguments to pass to requests.request().

Returns:

The requests.Response.json() from the response if return_format is json, or requests.Response.content from the response if return_format is content.

Raises:

RuntimeError – If return_format is not json or content.

post_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None[source]

Make a POST request.

Parameters:
Returns:

If successful, json date from requests.Response.json() or requests.Response.status_code as available. None if the request fails and raise_on_error is False.

Return type:

dict[str, Any] | int | None

delete_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None[source]

Make a DELETE request.

Parameters:
Returns:

If successful, json date from requests.Response.json() or requests.Response.status_code as available. None if the request fails and raise_on_error is False.

Return type:

dict[str, Any] | int | None

put_request(url: str, *, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None[source]

Make a PUT request.

Parameters:
Returns:

If successful, json date from requests.Response.json() or requests.Response.status_code as available. None if the request fails and raise_on_error is False.

Return type:

dict[str, Any] | int | None

patch_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None[source]

Make a PATCH request.

Parameters:
Returns:

If successful, json date from requests.Response.json() or requests.Response.status_code as available. None if the request fails and raise_on_error is False.

Return type:

dict[str, Any] | int | None

validate_response(resp: Response) None[source]

Validate that the response is not an error code.

If it is, then raise an error and display the error message.

Parameters:

resp (Response)

Return type:

None

data_parse(resp: dict[str, Any]) dict[str, Any][source]
data_parse(resp: list) list

Determines if the response json has nested data.

If it is nested, it just returns the data. This is useful in dealing with requests that might return multiple records, while others might return only a single record.

next_page_check_url(resp: dict[str, Any]) bool[source]

Check to determine if there is a next page.

This requires that the response json contains a pagination key that is empty if there is not a next page.

Parameters:

resp (dict[str, Any])

Return type:

bool

json_check(resp: Response) bool[source]

Check to see if a response has a json included in it.

Parameters:

resp (Response)

Return type:

bool

convert_to_table(data: list | Any) Table[source]

Internal method to create a Parsons table from a data element.

Parameters:

data (list | Any)

Return type:

Table

Check ENV

parsons.utilities.check_env.check(env: str, value: T, opt: bool | None = None, *, optional: bool = False, field: T | None = None) T[source]
parsons.utilities.check_env.check(env: str, value: None = None, opt: bool | None = None, *, optional: Literal[True], field: None = None) str | None
parsons.utilities.check_env.check(env: str, value: None = None, opt: bool | None = None, *, optional: Literal[False] = False, field: None = None) str

Check if an environment variable has been set or value has been provided.

Parameters:
  • env – Name of environment variable to check.

  • value – If provided, ignore environment variable and return this.

  • opt – Deprecated; use optional instead. If True, return None if no value is found instead of raising KeyError.

Keyword Arguments:
  • optional – If True, return None if no value is found instead of raising KeyError.

  • field – Deprecated; use value instead. If provided, ignore environment variable and return this.

Returns:

The value of the requested environment variable (str) or the provided value (T). If called with optional=True, will return None if no value is found or provided.

Raises:

KeyError – If no value is found/provided and optional is False.

Cloud Storage

The Parsons cloud storage utility was created to interact with APIs that require access to files to run an asynchronous process.

The cloud storage utility is currently being utilitized primarily by the NGPVAN class methods such as upload_scores() and upload_saved_list().

These methods have arguments specific their method, but all also contain the following cloud storage arguments:

  • url_type - The type of cloud storage to utilize. Currently S3 or GCS.

  • **url_kwargs - These are arguments specific to the cloud storage type in order to initialize. They are listed below based on the url type.

The file will then be converted to a CSV, compressed and posted to the cloud storage. A presigned url willbe generated and active by default for 60 minutes, but you can adjust the time.

parsons.utilities.cloud_storage.post_file(tbl, type: Literal['S3', 'GCS'], file_path=None, quoting=0, **file_storage_args)[source]

Generalizable method for moving files to an online file storage class.

It is used by methods that require access to a file via a public url (e.g. VAN).

S3 is the only option allowed.

Parameters:
  • tbl – Table A parsons.Table object

  • type (Literal['S3', 'GCS']) – str S3 or GCS (Google Cloud Storage)

  • file_path – str The file path to store the file. Not required if provided with the **file_storage_args.

  • quoting – attr The type of quoting to use for the csv.

  • **file_storage_args – kwargs Optional arguments specific to the file storage.

Returns:

None

Amazon S3

Below are the required and optional arguments utilizing Amazon S3 as the cloud storage service:

Argument

Required

Description

bucket

Yes

The S3 bucket to post the file

aws_access_key

No

Required if AWS_ACCESS_KEY_ID env variable not set.

aws_secret_access_key

No

Required if AWS_SECRET_ACCESS_KEY env variable not set.

public_url_expires

No

Defaults is 60 minutes.

Google Cloud Storage

Below are the required and optional arguments utilizing Google Cloud Storage as the cloud storage service:

Argument

Required

Description

bucket

Yes

The S3 bucket to post the file

app_creds

No

Required if GOOGLE_APPLICATION_CREDENTIALS env variable not set.

public_url_expire

No

Defaults is 60 minutes.

Credential Tools

parsons.utilities.credential_tools.decode_credential(credential, save_path=None, export=True, echo=False)[source]

Decode an encoded credential to a Python object.

Parameters:
  • credential – str An encoded credential.

  • save_path – str The path for where to save the decoded credential.

  • export – bool A flag for whether to export the decoded object to the environment. Defaults to true.

  • echo – bool A flag for whether to print the decoded object. Defaults to False.

Returns:

dict

The decoded object.

parsons.utilities.credential_tools.encode_from_json_str(credential)[source]

Encode credential(s) from a json string.

Parameters:

credential – str The credential json string to be encoded.

Returns:

str

The encoded credential.

parsons.utilities.credential_tools.encode_from_json_file(credential_file)[source]

Encode credential(s) from a json file.

Parameters:

credential_file – str The path to the json file with the credential to be encoded.

Returns:

str

The encoded credential.

parsons.utilities.credential_tools.encode_from_env(env_variables)[source]

Encode credential(s) from the current environment.

Parameters:

env_variables – list The list of credentials from the environment to be encoded.

Returns:

str

The encoded credential.

parsons.utilities.credential_tools.encode_from_dict(credential)[source]

Encode credential(s) from a dictionary.

Parameters:

credential – dict The list of credentials from the environment to be encoded.

Returns:

str

The encoded credential.

Datetime

parsons.utilities.datetime.date_to_timestamp(value, tzinfo=datetime.timezone.utc)[source]

Convert any date value into a Unix timestamp.

Parameters:
  • value – int or str or datetime Value to parse

  • tzinfo – datetime.timezone Optional: Timezone for the datetime; defaults to UTC.

Returns:

Unix timestamp (int)

parsons.utilities.datetime.convert_unix_to_readable(ts)[source]

Converts UNIX timestamps to readable timestamps.

parsons.utilities.datetime.parse_date(value: int | str | datetime, tzinfo=datetime.timezone.utc)[source]

Parse an arbitrary date value into a Python datetime.

If no value is provided (i.e., the value is None or empty), then the return value will be None.

Parameters:
  • value (int | str | datetime) – int or str or datetime Value to parse

  • tzinfo – datetime.timezone Optional: Timezone for the datetime; defaults to UTC.

Returns:

datetime.datetime or None

Files

parsons.utilities.files.create_temp_file(suffix=None)[source]

Create a temp file that will exist as long as the current script is running.

Parameters:

suffix – str A suffix/extension to add to the end of the temp file name

Returns:

str

The path of the temp file

parsons.utilities.files.create_temp_file_for_path(path)[source]

Creates a temp file that will exist as long as the current script is running, and with a file name mimicking that of the provided path.

Parameters:

path – str Path (or just file name) of the file you want the temp file to mimick.

Returns:

str

The path of the temp file

parsons.utilities.files.string_to_temp_file(string, suffix=None)[source]

Create a temporary file from a string. Currently used for packages that require credentials to be stored as a file.

Format Phone Number

parsons.utilities.format_phone_number.format_phone_number(phone_number, country_code='1')[source]

Formats a phone number in E.164 format, which is the international standard for phone numbers. Example: Converts “555-555-5555” -> “+15555555555”

Parameters:
  • phone_number (str) – The phone number to be formatted.

  • country_code (str) – The country code to be used as a prefix. Defaults to “1” (United States).

Returns:

The formatted phone number in E.164 format.

Return type:

str

Format JSON

parsons.utilities.json_format.arg_format(arg)[source]

Many APIs require arguments to formatted like this ‘thisTypeConfig’ which is not the standard for python so this method takes an argument ‘this_type_config’ and returns it as ‘thisTypeConfig’

parsons.utilities.json_format.remove_empty_keys(dirty_dict)[source]

Remove empty keys from a dictionary. This method is useful when passing jsons in which a null field will update the value to null and you don’t want that.

parsons.utilities.json_format.flatten_json(json)[source]

Flatten nested json to return a dict without nested values. Lists without nested values will be ignored, and lists of dicts will only return the first key value pair for each key. Useful for passing nested json to validation methods.

OAuth API Connector

class parsons.utilities.oauth_api_connector.OAuth2APIConnector(uri: str, client_id: str, client_secret: str, token_url: str, auto_refresh_url: str | None, headers: Mapping[str, str | bytes | None] | None = None, pagination_key: str | None = None, data_key: str | None = None, grant_type: str = 'client_credentials', authorization_kwargs: dict[str, Any] | None = None)[source]

Low level class for authenticated API requests using OAuth2 that other connectors can utilize.

It extends APIConnector by wrapping the request methods in a server-side OAuth2 client. Otherwise, it provides the same interface as APIConnector.

Parameters:
request(url: str, req_type: Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], *, json: dict | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, raise_on_error: bool = True, **kwargs) Response[source]

Base request using requests libary.

Parameters:
Return type:

Response

token_saver(token: OAuth2Token) None[source]

Replace the token in the class instance.

Parameters:

token (OAuth2Token)

Return type:

None

convert_to_table(data: list | Any) Table

Internal method to create a Parsons table from a data element.

Parameters:

data (list | Any)

Return type:

Table

data_parse(resp: dict[str, Any] | list) dict[str, Any] | list

Determines if the response json has nested data.

If it is nested, it just returns the data. This is useful in dealing with requests that might return multiple records, while others might return only a single record.

Parameters:

resp (dict[str, Any] | list)

Return type:

dict[str, Any] | list

delete_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None

Make a DELETE request.

Parameters:
Returns:

If successful, json date from requests.Response.json() or requests.Response.status_code as available. None if the request fails and raise_on_error is False.

Return type:

dict[str, Any] | int | None

get_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, return_format: Literal['json', 'content'] = 'json', raise_on_error: bool = True, **kwargs) dict | bytes

Make a GET request.

Parameters:
Returns:

The requests.Response.json() from the response if return_format is json, or requests.Response.content from the response if return_format is content.

Raises:

RuntimeError – If return_format is not json or content.

Return type:

dict | bytes

json_check(resp: Response) bool

Check to see if a response has a json included in it.

Parameters:

resp (Response)

Return type:

bool

next_page_check_url(resp: dict[str, Any]) bool

Check to determine if there is a next page.

This requires that the response json contains a pagination key that is empty if there is not a next page.

Parameters:

resp (dict[str, Any])

Return type:

bool

patch_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None

Make a PATCH request.

Parameters:
Returns:

If successful, json date from requests.Response.json() or requests.Response.status_code as available. None if the request fails and raise_on_error is False.

Return type:

dict[str, Any] | int | None

post_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None

Make a POST request.

Parameters:
Returns:

If successful, json date from requests.Response.json() or requests.Response.status_code as available. None if the request fails and raise_on_error is False.

Return type:

dict[str, Any] | int | None

put_request(url: str, *, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None

Make a PUT request.

Parameters:
Returns:

If successful, json date from requests.Response.json() or requests.Response.status_code as available. None if the request fails and raise_on_error is False.

Return type:

dict[str, Any] | int | None

validate_response(resp: Response) None

Validate that the response is not an error code.

If it is, then raise an error and display the error message.

Parameters:

resp (Response)

Return type:

None

SQL Helpers

parsons.utilities.sql_helpers.redact_credentials(sql)[source]

Redact any credentials explicitly represented in SQL (e.g. COPY statement)

SSH Utilities

parsons.utilities.ssh_utilities.query_through_ssh(ssh_host, ssh_port, ssh_username, ssh_password, db_host, db_port, db_name, db_username, db_password, query)[source]
Parameters:
  • ssh_host – The host for the SSH connection

  • ssh_port – The port for the SSH connection

  • ssh_username – The username for the SSH connection

  • ssh_password – The password for the SSH connection

  • db_host – The host for the db connection

  • db_port – The port for the db connection

  • db_name – The name of the db database

  • db_username – The username for the db database

  • db_password – The password for the db database

  • query – The SQL query to execute

Returns:

A list of records resulting from the query or None if something went wrong

ZIP Archive

parsons.utilities.zip_archive.create_archive(archive_path, file_path, file_name=None, if_exists: Literal['replace', 'append'] = 'replace')[source]

Create and fill an archive.

Parameters:
  • archive_path – str The file name of zip archive

  • file_path – str The path of the file

  • file_name – str The name of the file in the archive

  • if_exists (Literal['replace', 'append']) – str If archive already exists, one of ‘replace’ or ‘append’

Returns:

Zip archive path

parsons.utilities.zip_archive.unzip_archive(archive_path, destination=None)[source]

Unzip an archive. Only returns the path of the first file in the archive.

Parameters:
  • archive_path – str Path to the ZIP archive

  • destination – str Optional; path to unzip the archive into; if not specified, the

Returns:

Extracted file path.

dbt Utilities

Core methods for running dbt commands.

parsons.utilities.dbt.dbt.run_dbt_commands(commands: str | list[str], dbt_project_directory: Path, dbt_profile_directory: Path | None = None, loggers: list[dbtLogger | type[dbtLogger]] | None = None) list[Manifest][source]

Executes dbt commands within a directory, optionally logs results.

from pathlib import Path
from parsons.utilities.dbt import (
    run_dbt_commands,
    dbtLoggerSlack,
    dbtLoggerPython
)
results = run_dbt_commands(
    commands=["dbt run", "dbt test"],
    dbt_project_directory=Path("/path/to/dbt/project"),
    loggers=[dbtLoggerPython, dbtLoggerSlack]
)
Parameters:
  • commands (str | list[str]) – Union[str, list[str]] A single dbt command as a string or a list of dbt commands to be executed.

  • dbt_project_directory (Path) – Path The path to the dbt project directory where the commands will be executed.

  • dbt_profile_directory (Path | None) – Path, optional The path to find the dbt profile

  • loggers (list[dbtLogger | type[dbtLogger]] | None) – Optional[list[Union[dbtLogger, Type[dbtLogger]]]], default=None A list of logger instances or logger classes. If classes are provided, they will be instantiated. Each logger should have a send method that takes the dbt command results as an argument.

Returns:

list[Manifest]

A list of result objects from the executed dbt commands.

Return type:

list[Manifest]

Logging classes for use with Parsons dbt utility.

class parsons.utilities.dbt.logging.dbtLogger[source]

Abstract base class for aggregating logs from dbt commands.

abstractmethod send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

Parameters:

manifests (list[Manifest])

Return type:

None

class parsons.utilities.dbt.logging.dbtLoggerMarkdown[source]

Formats dbt results into a structured Markdown summary.

format_result() str[source]

Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.

Return type:

str

abstractmethod send(manifests: list[Manifest]) None

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

Parameters:

manifests (list[Manifest])

Return type:

None

class parsons.utilities.dbt.logging.dbtLoggerStdout[source]
send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

Parameters:

manifests (list[Manifest])

Return type:

None

format_result() str

Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.

Return type:

str

class parsons.utilities.dbt.logging.dbtLoggerPython[source]
send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

Parameters:

manifests (list[Manifest])

Return type:

None

format_result() str

Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.

Return type:

str

class parsons.utilities.dbt.logging.dbtLoggerSlack(slack_webhook: str, slack_channel: str | None = None)[source]
Parameters:
  • slack_webhook (str)

  • slack_channel (str | None)

send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

Parameters:

manifests (list[Manifest])

Return type:

None

format_result() str

Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.

Return type:

str

class parsons.utilities.dbt.logging.dbtLoggerDatabase(database_connector: DatabaseConnector, destination_table_runs: str, destination_table_nodes: str, extra_run_table_fields: dict, **copy_kwargs)[source]

Log dbt artifacts by loading to a database.

This class is an abstract base class for logging dbt artifacts to a database.

Parameters:
format_command_result(manifest: Manifest) tuple[Table, Table][source]

Loads all artifact results into a Parsons Table.

Parameters:

manifest (Manifest)

Return type:

tuple[Table, Table]

format_result() tuple[Table, Table][source]

Returns a table for the dbt runs and a table for the node runs.

Return type:

tuple[Table, Table]

send(manifests: list[Manifest]) None[source]

The send method is called to execute logging.

manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.

Parameters:

manifests (list[Manifest])

Return type:

None

Pydantic data models for use with dbt utilities.

class parsons.utilities.dbt.models.Manifest(command: str, run_execution_result: RunExecutionResult | None = None, *, dbt_manifest: RunExecutionResult | None = None)[source]

A wrapper for dbt execution results.

Parameters:
  • command (str)

  • run_execution_result (RunExecutionResult | None)

  • dbt_manifest (RunExecutionResult | None)

filter_results(**kwargs) list[NodeResult][source]

Subset of NodeResults based on filter.

Return type:

list[NodeResult]

property dbt_manifest: RunExecutionResult

Legacy proxy to new attribute.

property overall_status: NodeStatus

Determine the overall state of the command.

Returns a member of the NodeStatus Enum: Error, Warn, Skipped, or Success.

property skips: list[NodeResult]

Returns skipped model builds but not skipped tests.

property summary: Counter

Aggregates all node outcomes into a count of status strings.

property total_gb_processed: float

Total GB processed by full dbt command run.

property total_slot_hours: float

Total slot hours used by full dbt command run.

class parsons.utilities.dbt.models.EnhancedNodeResult(status: RunStatus | TestStatus | FreshnessStatus, timing: List[TimingInfo], thread_id: str, execution_time: float, adapter_response: Dict[str, Any], message: str | None, failures: int | None, node: AnalysisNode | FunctionNode | SingularTestNode | HookNode | ModelNode | SqlNode | GenericTestNode | SnapshotNode | UnitTestNode | SeedNode | SourceDefinition)[source]
Parameters:
  • status (RunStatus | TestStatus | FreshnessStatus)

  • timing (List[TimingInfo])

  • thread_id (str)

  • execution_time (float)

  • adapter_response (Dict[str, Any])

  • message (str | None)

  • failures (int | None)

  • node (AnalysisNode | FunctionNode | SingularTestNode | HookNode | ModelNode | SqlNode | GenericTestNode | SnapshotNode | UnitTestNode | SeedNode | SourceDefinition)

log_message() str | None[source]

Helper method to generate message for logs.

Return type:

str | None