Utilities¶
API Connector¶
- class parsons.utilities.api_connector.APIConnector(uri: str, headers: Mapping[str, str | bytes | None] | None = None, auth: tuple[str, str] | AuthBase | Callable[[PreparedRequest], PreparedRequest] | None = None, pagination_key: str | None = None, data_key: str | None = None)[source]¶
Low level class for API requests that other connectors can utilize.
It is understood that there are many standards for REST APIs and it will be difficult to create a universal connector. The goal of this class is create series of utilities that can be mixed and matched to, hopefully, meet the needs of the specific API.
- Parameters:
- request(url: str, req_type: Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], *, json: Any | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, raise_on_error: bool = True, **kwargs) Response[source]¶
Base request using requests libary.
- Parameters:
url (str) – The url request string. If
urlis a relative URL, it will be joined with theuriof theAPIConnector`. If ``urlis an absolute URL, it will be used as is.req_type (Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS']) – The request type.
json (Any | None) – The payload of the request object. By using json, it will automatically serialize the dictionary.
data (Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None) – The payload of the request object. Use instead of json in some instances.
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The parameters to append to the url. E.g.
http://myapi.com/things?id=1raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Return type:
- get_request(url: ..., *, params: ... = None, return_format: Literal['json'] = 'json', raise_on_error: ... = True, **kwargs) dict[str, Any][source]¶
- get_request(url: ..., *, params: ... = None, return_format: Literal['content'], raise_on_error: ... = True, **kwargs) bytes
Make a GET request.
- Parameters:
url – A complete and valid url for the api request.
params – The request parameters.
raise_on_error – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
The
requests.Response.json()from the response if return_format isjson, orrequests.Response.contentfrom the response if return_format iscontent.- Raises:
RuntimeError – If return_format is not
jsonorcontent.
- post_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None[source]¶
Make a POST request.
- Parameters:
url (str) – A complete and valid url for the api request
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters
data (Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None) – A data object to post
json (Any | None) – A JSON object to post
success_codes (list[int] | None) – The expected success code to be returned. If not provided, accepts 200, 201, 202, and 204.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
If successful, json date from
requests.Response.json()orrequests.Response.status_codeas available.Noneif the request fails and raise_on_error isFalse.- Return type:
- delete_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None[source]¶
Make a DELETE request.
- Parameters:
url (str) – A complete and valid url for the api request
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters
success_codes (list[int] | None) – The expected success codes to be returned. If not provided, accepts 200, 201, 204.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
If successful, json date from
requests.Response.json()orrequests.Response.status_codeas available.Noneif the request fails and raise_on_error isFalse.- Return type:
- put_request(url: str, *, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None[source]¶
Make a PUT request.
- Parameters:
url (str) – A complete and valid url for the api request
data (Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None) – A data object to post
json (Any | None) – A JSON object to post
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters
success_codes (list[int] | None) – The expected success codes to be returned. If not provided, accepts 200, 201, 204.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
If successful, json date from
requests.Response.json()orrequests.Response.status_codeas available.Noneif the request fails and raise_on_error isFalse.- Return type:
- patch_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None[source]¶
Make a PATCH request.
- Parameters:
url (str) – A complete and valid url for the api request
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters
data (Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None) – A data object to post
json (Any | None) – A JSON object to post
success_codes (list[int] | None) – The expected success codes to be returned. If not provided, accepts 200, 201, and 204.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
If successful, json date from
requests.Response.json()orrequests.Response.status_codeas available.Noneif the request fails and raise_on_error isFalse.- Return type:
- validate_response(resp: Response) None[source]¶
Validate that the response is not an error code.
If it is, then raise an error and display the error message.
- Parameters:
resp (Response)
- Return type:
None
- data_parse(resp: dict[str, Any]) dict[str, Any][source]¶
- data_parse(resp: list) list
Determines if the response json has nested data.
If it is nested, it just returns the data. This is useful in dealing with requests that might return multiple records, while others might return only a single record.
- next_page_check_url(resp: dict[str, Any]) bool[source]¶
Check to determine if there is a next page.
This requires that the response json contains a pagination key that is empty if there is not a next page.
Check ENV¶
- parsons.utilities.check_env.check(env: str, value: T, opt: bool | None = None, *, optional: bool = False, field: T | None = None) T[source]¶
- parsons.utilities.check_env.check(env: str, value: None = None, opt: bool | None = None, *, optional: Literal[True], field: None = None) str | None
- parsons.utilities.check_env.check(env: str, value: None = None, opt: bool | None = None, *, optional: Literal[False] = False, field: None = None) str
Check if an environment variable has been set or value has been provided.
- Parameters:
env – Name of environment variable to check.
value – If provided, ignore environment variable and return this.
opt – Deprecated; use optional instead. If
True, returnNoneif no value is found instead of raisingKeyError.
- Keyword Arguments:
optional – If
True, returnNoneif no value is found instead of raisingKeyError.field – Deprecated; use value instead. If provided, ignore environment variable and return this.
- Returns:
The value of the requested environment variable (str) or the provided value (T). If called with
optional=True, will returnNoneif no value is found or provided.- Raises:
KeyError – If no value is found/provided and optional is
False.
Cloud Storage¶
The Parsons cloud storage utility was created to interact with APIs that require access to files to run an asynchronous process.
The cloud storage utility is currently being utilitized primarily by the
NGPVAN class methods such as upload_scores()
and upload_saved_list().
These methods have arguments specific their method, but all also contain the following cloud storage arguments:
url_type- The type of cloud storage to utilize. CurrentlyS3orGCS.**url_kwargs- These are arguments specific to the cloud storage type in order to initialize. They are listed below based on the url type.
The file will then be converted to a CSV, compressed and posted to the cloud storage. A presigned url willbe generated and active by default for 60 minutes, but you can adjust the time.
- parsons.utilities.cloud_storage.post_file(tbl, type: Literal['S3', 'GCS'], file_path=None, quoting=0, **file_storage_args)[source]¶
Generalizable method for moving files to an online file storage class.
It is used by methods that require access to a file via a public url (e.g. VAN).
S3 is the only option allowed.
- Parameters:
tbl – Table A parsons.Table object
type (Literal['S3', 'GCS']) – str S3 or GCS (Google Cloud Storage)
file_path – str The file path to store the file. Not required if provided with the **file_storage_args.
quoting – attr The type of quoting to use for the csv.
**file_storage_args – kwargs Optional arguments specific to the file storage.
- Returns:
None
Amazon S3¶
Below are the required and optional arguments utilizing Amazon S3 as the cloud storage service:
Argument |
Required |
Description |
|---|---|---|
|
Yes |
The S3 bucket to post the file |
|
No |
Required if |
|
No |
Required if |
|
No |
Defaults is 60 minutes. |
Google Cloud Storage¶
Below are the required and optional arguments utilizing Google Cloud Storage as the cloud storage service:
Argument |
Required |
Description |
|---|---|---|
|
Yes |
The S3 bucket to post the file |
|
No |
Required if |
|
No |
Defaults is 60 minutes. |
Credential Tools¶
- parsons.utilities.credential_tools.decode_credential(credential, save_path=None, export=True, echo=False)[source]¶
Decode an encoded credential to a Python object.
- Parameters:
credential – str An encoded credential.
save_path – str The path for where to save the decoded credential.
export – bool A flag for whether to export the decoded object to the environment. Defaults to true.
echo – bool A flag for whether to print the decoded object. Defaults to False.
- Returns:
- dict
The decoded object.
- parsons.utilities.credential_tools.encode_from_json_str(credential)[source]¶
Encode credential(s) from a json string.
- Parameters:
credential – str The credential json string to be encoded.
- Returns:
- str
The encoded credential.
- parsons.utilities.credential_tools.encode_from_json_file(credential_file)[source]¶
Encode credential(s) from a json file.
- Parameters:
credential_file – str The path to the json file with the credential to be encoded.
- Returns:
- str
The encoded credential.
Datetime¶
- parsons.utilities.datetime.date_to_timestamp(value, tzinfo=datetime.timezone.utc)[source]¶
Convert any date value into a Unix timestamp.
- Parameters:
value – int or str or datetime Value to parse
tzinfo – datetime.timezone Optional: Timezone for the datetime; defaults to UTC.
- Returns:
Unix timestamp (int)
- parsons.utilities.datetime.convert_unix_to_readable(ts)[source]¶
Converts UNIX timestamps to readable timestamps.
Files¶
- parsons.utilities.files.create_temp_file(suffix=None)[source]¶
Create a temp file that will exist as long as the current script is running.
- Parameters:
suffix – str A suffix/extension to add to the end of the temp file name
- Returns:
- str
The path of the temp file
- parsons.utilities.files.create_temp_file_for_path(path)[source]¶
Creates a temp file that will exist as long as the current script is running, and with a file name mimicking that of the provided path.
- Parameters:
path – str Path (or just file name) of the file you want the temp file to mimick.
- Returns:
- str
The path of the temp file
Format Phone Number¶
Format JSON¶
- parsons.utilities.json_format.arg_format(arg)[source]¶
Many APIs require arguments to formatted like this ‘thisTypeConfig’ which is not the standard for python so this method takes an argument ‘this_type_config’ and returns it as ‘thisTypeConfig’
OAuth API Connector¶
- class parsons.utilities.oauth_api_connector.OAuth2APIConnector(uri: str, client_id: str, client_secret: str, token_url: str, auto_refresh_url: str | None, headers: Mapping[str, str | bytes | None] | None = None, pagination_key: str | None = None, data_key: str | None = None, grant_type: str = 'client_credentials', authorization_kwargs: dict[str, Any] | None = None)[source]¶
Low level class for authenticated API requests using OAuth2 that other connectors can utilize.
It extends APIConnector by wrapping the request methods in a server-side OAuth2 client. Otherwise, it provides the same interface as APIConnector.
- Parameters:
- request(url: str, req_type: Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], *, json: dict | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, raise_on_error: bool = True, **kwargs) Response[source]¶
Base request using requests libary.
- Parameters:
url (str) – str The url request string. If
urlis a relative URL, it will be joined with theuriof theOAuthAPIConnector`. if ``urlis an absolute URL, it will be used as is.req_type (Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS']) – str The request type. One of
GET,POST,PUT,PATCH,DELETE, orOPTIONS.json (dict | None) – The payload of the request object. By using json, it will automatically serialize the dictionary
data (Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None) – The payload of the request object. Used instead of json in some instances.
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The parameters to append to the url (e.g. http://myapi.com/things?id=1)
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Return type:
- token_saver(token: OAuth2Token) None[source]¶
Replace the token in the class instance.
- Parameters:
token (OAuth2Token)
- Return type:
None
- convert_to_table(data: list | Any) Table¶
Internal method to create a Parsons table from a data element.
- data_parse(resp: dict[str, Any] | list) dict[str, Any] | list¶
Determines if the response json has nested data.
If it is nested, it just returns the data. This is useful in dealing with requests that might return multiple records, while others might return only a single record.
- delete_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None¶
Make a DELETE request.
- Parameters:
url (str) – A complete and valid url for the api request
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters
success_codes (list[int] | None) – The expected success codes to be returned. If not provided, accepts 200, 201, 204.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
If successful, json date from
requests.Response.json()orrequests.Response.status_codeas available.Noneif the request fails and raise_on_error isFalse.- Return type:
- get_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, return_format: Literal['json', 'content'] = 'json', raise_on_error: bool = True, **kwargs) dict | bytes¶
Make a GET request.
- Parameters:
url (str) – A complete and valid url for the api request.
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().return_format (Literal['json', 'content'])
- Returns:
The
requests.Response.json()from the response if return_format isjson, orrequests.Response.contentfrom the response if return_format iscontent.- Raises:
RuntimeError – If return_format is not
jsonorcontent.- Return type:
- next_page_check_url(resp: dict[str, Any]) bool¶
Check to determine if there is a next page.
This requires that the response json contains a pagination key that is empty if there is not a next page.
- patch_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None¶
Make a PATCH request.
- Parameters:
url (str) – A complete and valid url for the api request
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters
data (Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None) – A data object to post
json (Any | None) – A JSON object to post
success_codes (list[int] | None) – The expected success codes to be returned. If not provided, accepts 200, 201, and 204.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
If successful, json date from
requests.Response.json()orrequests.Response.status_codeas available.Noneif the request fails and raise_on_error isFalse.- Return type:
- post_request(url: str, *, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None¶
Make a POST request.
- Parameters:
url (str) – A complete and valid url for the api request
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters
data (Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None) – A data object to post
json (Any | None) – A JSON object to post
success_codes (list[int] | None) – The expected success code to be returned. If not provided, accepts 200, 201, 202, and 204.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
If successful, json date from
requests.Response.json()orrequests.Response.status_codeas available.Noneif the request fails and raise_on_error isFalse.- Return type:
- put_request(url: str, *, data: Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None = None, json: Any | None = None, params: Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None = None, success_codes: list[int] | None = None, raise_on_error: bool = True, **kwargs) dict[str, Any] | int | None¶
Make a PUT request.
- Parameters:
url (str) – A complete and valid url for the api request
data (Iterable[bytes] | str | bytes | list[tuple[Any, Any]] | tuple[tuple[Any, Any], ...] | Mapping[Any, Any] | None) – A data object to post
json (Any | None) – A JSON object to post
params (Mapping[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None] | Iterable[tuple[str | bytes | int | float, str | bytes | int | float | Iterable[str | bytes | int | float] | None]] | str | bytes | None) – The request parameters
success_codes (list[int] | None) – The expected success codes to be returned. If not provided, accepts 200, 201, 204.
raise_on_error (bool) – If the request yields an error status code (anything above 400), raise an error. In most cases, this should be
True, however in some cases, if you are looping through data, you might want to ignore individual failures.**kwargs – Additional keyword arguments to pass to
requests.request().
- Returns:
If successful, json date from
requests.Response.json()orrequests.Response.status_codeas available.Noneif the request fails and raise_on_error isFalse.- Return type:
SQL Helpers¶
SSH Utilities¶
- parsons.utilities.ssh_utilities.query_through_ssh(ssh_host, ssh_port, ssh_username, ssh_password, db_host, db_port, db_name, db_username, db_password, query)[source]¶
- Parameters:
ssh_host – The host for the SSH connection
ssh_port – The port for the SSH connection
ssh_username – The username for the SSH connection
ssh_password – The password for the SSH connection
db_host – The host for the db connection
db_port – The port for the db connection
db_name – The name of the db database
db_username – The username for the db database
db_password – The password for the db database
query – The SQL query to execute
- Returns:
A list of records resulting from the query or None if something went wrong
ZIP Archive¶
- parsons.utilities.zip_archive.create_archive(archive_path, file_path, file_name=None, if_exists: Literal['replace', 'append'] = 'replace')[source]¶
Create and fill an archive.
- Parameters:
archive_path – str The file name of zip archive
file_path – str The path of the file
file_name – str The name of the file in the archive
if_exists (Literal['replace', 'append']) – str If archive already exists, one of ‘replace’ or ‘append’
- Returns:
Zip archive path
- parsons.utilities.zip_archive.unzip_archive(archive_path, destination=None)[source]¶
Unzip an archive. Only returns the path of the first file in the archive.
- Parameters:
archive_path – str Path to the ZIP archive
destination – str Optional; path to unzip the archive into; if not specified, the
- Returns:
Extracted file path.
dbt Utilities¶
Core methods for running dbt commands.
- parsons.utilities.dbt.dbt.run_dbt_commands(commands: str | list[str], dbt_project_directory: Path, dbt_profile_directory: Path | None = None, loggers: list[dbtLogger | type[dbtLogger]] | None = None) list[Manifest][source]¶
Executes dbt commands within a directory, optionally logs results.
from pathlib import Path from parsons.utilities.dbt import ( run_dbt_commands, dbtLoggerSlack, dbtLoggerPython ) results = run_dbt_commands( commands=["dbt run", "dbt test"], dbt_project_directory=Path("/path/to/dbt/project"), loggers=[dbtLoggerPython, dbtLoggerSlack] )
- Parameters:
commands (str | list[str]) – Union[str, list[str]] A single dbt command as a string or a list of dbt commands to be executed.
dbt_project_directory (Path) – Path The path to the dbt project directory where the commands will be executed.
dbt_profile_directory (Path | None) – Path, optional The path to find the dbt profile
loggers (list[dbtLogger | type[dbtLogger]] | None) – Optional[list[Union[dbtLogger, Type[dbtLogger]]]], default=None A list of logger instances or logger classes. If classes are provided, they will be instantiated. Each logger should have a send method that takes the dbt command results as an argument.
- Returns:
- list[Manifest]
A list of result objects from the executed dbt commands.
- Return type:
Logging classes for use with Parsons dbt utility.
- class parsons.utilities.dbt.logging.dbtLogger[source]¶
Abstract base class for aggregating logs from dbt commands.
- abstractmethod send(manifests: list[Manifest]) None[source]¶
The send method is called to execute logging.
manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.
- class parsons.utilities.dbt.logging.dbtLoggerMarkdown[source]¶
Formats dbt results into a structured Markdown summary.
- format_result() str[source]¶
Aggregates results from multiple dbt commands into a single report, determining an overall ‘worst-case’ status for the header.
- Return type:
- abstractmethod send(manifests: list[Manifest]) None¶
The send method is called to execute logging.
manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.
- class parsons.utilities.dbt.logging.dbtLoggerStdout[source]¶
- send(manifests: list[Manifest]) None[source]¶
The send method is called to execute logging.
manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.
- class parsons.utilities.dbt.logging.dbtLoggerPython[source]¶
- send(manifests: list[Manifest]) None[source]¶
The send method is called to execute logging.
manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.
- class parsons.utilities.dbt.logging.dbtLoggerSlack(slack_webhook: str, slack_channel: str | None = None)[source]¶
-
- send(manifests: list[Manifest]) None[source]¶
The send method is called to execute logging.
manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.
- class parsons.utilities.dbt.logging.dbtLoggerDatabase(database_connector: DatabaseConnector, destination_table_runs: str, destination_table_nodes: str, extra_run_table_fields: dict, **copy_kwargs)[source]¶
Log dbt artifacts by loading to a database.
This class is an abstract base class for logging dbt artifacts to a database.
- Parameters:
database_connector (DatabaseConnector)
destination_table_runs (str)
destination_table_nodes (str)
extra_run_table_fields (dict)
- format_command_result(manifest: Manifest) tuple[Table, Table][source]¶
Loads all artifact results into a Parsons Table.
- format_result() tuple[Table, Table][source]¶
Returns a table for the dbt runs and a table for the node runs.
- send(manifests: list[Manifest]) None[source]¶
The send method is called to execute logging.
manifests are passed to this method directly (rather than on initialization) so that the logger class can be initialized before the dbt commands have been run. This is mostly necessary for loggers that need to be initialized with credentials or options before being provided to the run_dbt_commands method.
Pydantic data models for use with dbt utilities.
- class parsons.utilities.dbt.models.Manifest(command: str, run_execution_result: RunExecutionResult | None = None, *, dbt_manifest: RunExecutionResult | None = None)[source]¶
A wrapper for dbt execution results.
- Parameters:
command (str)
run_execution_result (RunExecutionResult | None)
dbt_manifest (RunExecutionResult | None)
- filter_results(**kwargs) list[NodeResult][source]¶
Subset of NodeResults based on filter.
- Return type:
list[NodeResult]
- property dbt_manifest: RunExecutionResult¶
Legacy proxy to new attribute.
- property overall_status: NodeStatus¶
Determine the overall state of the command.
Returns a member of the NodeStatus Enum: Error, Warn, Skipped, or Success.
- class parsons.utilities.dbt.models.EnhancedNodeResult(status: RunStatus | TestStatus | FreshnessStatus, timing: List[TimingInfo], thread_id: str, execution_time: float, adapter_response: Dict[str, Any], message: str | None, failures: int | None, node: AnalysisNode | FunctionNode | SingularTestNode | HookNode | ModelNode | SqlNode | GenericTestNode | SnapshotNode | UnitTestNode | SeedNode | SourceDefinition)[source]¶
- Parameters:
status (RunStatus | TestStatus | FreshnessStatus)
timing (List[TimingInfo])
thread_id (str)
execution_time (float)
message (str | None)
failures (int | None)
node (AnalysisNode | FunctionNode | SingularTestNode | HookNode | ModelNode | SqlNode | GenericTestNode | SnapshotNode | UnitTestNode | SeedNode | SourceDefinition)