import datetime
import logging
import pickle
import random
import uuid
from contextlib import contextmanager
from typing import List, Optional, Union
import google
import petl
from google.cloud import bigquery, exceptions
from google.cloud.bigquery import dbapi
from google.cloud.bigquery.job import LoadJobConfig
from google.oauth2.credentials import Credentials
from parsons.databases.database_connector import DatabaseConnector
from parsons.databases.table import BaseTable
from parsons.etl import Table
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file
logger = logging.getLogger(__name__)
BIGQUERY_TYPE_MAP = {
"str": "STRING",
"float": "FLOAT",
"int": "INTEGER",
"bool": "BOOLEAN",
"datetime": "DATETIME",
"date": "DATE",
"time": "TIME",
"NoneType": "STRING",
"UUID": "STRING",
"timestamp": "TIMESTAMP",
"Decimal": "FLOAT",
}
# Max number of rows that we query at a time, so we can avoid loading huge
# data sets into memory.
# 100k rows per batch at ~1k bytes each = ~100MB per batch.
QUERY_BATCH_SIZE = 100000
def parse_table_name(table_name):
# Helper function to parse out the different components of a table ID
parts = table_name.split(".")
parts.reverse()
parsed = {
"project": None,
"dataset": None,
"table": None,
}
if len(parts) > 0:
parsed["table"] = parts[0]
if len(parts) > 1:
parsed["dataset"] = parts[1]
if len(parts) > 2:
parsed["project"] = parts[2]
return parsed
def ends_with_semicolon(query: str) -> str:
query = query.strip()
if query[-1] == ";":
return query
return query + ";"
def map_column_headers_to_schema_field(schema_definition: list) -> list:
"""
Loops through a list of dictionaries and instantiates
google.cloud.bigquery.SchemaField objects. Useful docs
from Google's API can be found here:
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.schema.SchemaField
`Args`:
schema_definition: list
This function expects a list of dictionaries in the following format:
```
schema_definition = [
{
"name": column_name,
"field_type": [INTEGER, STRING, FLOAT, etc.]
},
{
"name": column_name,
"field_type": [INTEGER, STRING, FLOAT, etc.],
"mode": "REQUIRED"
},
{
"name": column_name,
"field_type": [INTEGER, STRING, FLOAT, etc.],
"default_value_expression": CURRENT_TIMESTAMP()
}
]
```
`Returns`:
List of instantiated `SchemaField` objects
"""
# TODO - Better way to test for this
if isinstance(schema_definition[0], bigquery.SchemaField):
logger.debug("User supplied list of SchemaField objects")
return schema_definition
return [bigquery.SchemaField(**x) for x in schema_definition]
[docs]
class GoogleBigQuery(DatabaseConnector):
"""
Class for querying BigQuery table and returning the data as Parsons tables.
This class requires application credentials in the form of a json. It can be passed
in the following ways:
* Set an environmental variable named ``GOOGLE_APPLICATION_CREDENTIALS`` with the
local path to the credentials json.
Example: ``GOOGLE_APPLICATION_CREDENTALS='path/to/creds.json'``
* Pass in the path to the credentials using the ``app_creds`` argument.
* Pass in a json string using the ``app_creds`` argument.
Args:
app_creds: str
A credentials json string or a path to a json file. Not required
if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
project: str
The project which the client is acting on behalf of. If not passed
then will use the default inferred environment.
location: str
Default geographic location for tables
client_options: dict
A dictionary containing any requested client options. Defaults to the required
scopes for making API calls against External tables stored in Google Drive.
Can be set to None if these permissions are not desired
"""
def __init__(
self,
app_creds: Optional[Union[str, dict, Credentials]] = None,
project=None,
location=None,
client_options: dict = {
"scopes": [
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/cloud-platform",
]
},
):
self.app_creds = app_creds
if isinstance(app_creds, Credentials):
self.credentials = app_creds
else:
self.env_credential_path = str(uuid.uuid4())
setup_google_application_credentials(
app_creds, target_env_var_name=self.env_credential_path
)
self.credentials = load_google_application_credentials(self.env_credential_path)
self.project = project
self.location = location
self.client_options = client_options
# We will not create the client until we need to use it, since creating the client
# without valid GOOGLE_APPLICATION_CREDENTIALS raises an exception.
# This attribute will be used to hold the client once we have created it.
self._client = None
self._dbapi = dbapi
self.dialect = "bigquery"
@property
def client(self):
"""
Get the Google BigQuery client to use for making queries.
`Returns:`
`google.cloud.bigquery.client.Client`
"""
if not self._client:
# Create a BigQuery client to use to make the query
self._client = bigquery.Client(
project=self.project,
location=self.location,
client_options=self.client_options,
credentials=self.credentials,
)
return self._client
[docs]
@contextmanager
def connection(self):
"""
Generate a BigQuery connection.
The connection is set up as a python "context manager", so it will be closed
automatically when the connection goes out of scope. Note that the BigQuery
API uses jobs to run database operations and, as such, simply has a no-op for
a "commit" function.
If you would like to manage transactions, please use multi-statement queries
as [outlined here](https://cloud.google.com/bigquery/docs/transactions)
or utilize the `query_with_transaction` method on this class.
When using the connection, make sure to put it in a ``with`` block (necessary for
any context manager):
``with bq.connection() as conn:``
`Returns:`
Google BigQuery ``connection`` object
"""
conn = self._dbapi.connect(self.client)
try:
yield conn
finally:
conn.close()
@contextmanager
def cursor(self, connection):
cur = connection.cursor()
try:
yield cur
finally:
cur.close()
[docs]
def query(
self,
sql: str,
parameters: Optional[Union[list, dict]] = None,
return_values: bool = True,
) -> Optional[Table]:
"""
Run a BigQuery query and return the results as a Parsons table.
To include python variables in your query, it is recommended to pass them as parameters,
following the BigQuery style where parameters are prefixed with `@`s.
Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL
injection attacks.
**Parameter Examples**
.. code-block:: python
name = "Beatrice O'Brady"
sql = 'SELECT * FROM my_table WHERE name = %s'
rs.query(sql, parameters=[name])
.. code-block:: python
name = "Beatrice O'Brady"
sql = "SELECT * FROM my_table WHERE name = %(name)s"
rs.query(sql, parameters={'name': name})
`Args:`
sql: str
A valid BigTable statement
parameters: dict
A dictionary of query parameters for BigQuery.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
with self.connection() as connection:
return self.query_with_connection(
sql, connection, parameters=parameters, return_values=return_values
)
[docs]
def query_with_connection(
self, sql, connection, parameters=None, commit=True, return_values: bool = True
):
"""
Execute a query against the BigQuery database, with an existing connection.
Useful for batching queries together. Will return ``None`` if the query
returns zero rows.
`Args:`
sql: str
A valid SQL statement
connection: obj
A connection object obtained from ``redshift.connection()``
parameters: list
A list of python variables to be converted into SQL values in your query
commit: boolean
Must be true. BigQuery
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
if not commit:
raise ValueError(
"""
BigQuery implementation uses an API client which always auto-commits.
If you wish to wrap multiple queries in a transaction, use
Mulit-Statement transactions within a single query as outlined
here: https://cloud.google.com/bigquery/docs/transactions or use the
`query_with_transaction` method on the BigQuery connector.
"""
)
# get our connection and cursor
with self.cursor(connection) as cursor:
# Run the query
cursor.execute(sql, parameters)
if not return_values:
return None
# This applies when running a SQL statement without any return value
# e.g. when creating a view or a table
# This does not apply when 0 rows are returned
if not cursor.description:
return None
final_table = self._fetch_query_results(cursor=cursor)
return final_table
def query_with_transaction(self, queries, parameters=None):
queries_with_semicolons = [ends_with_semicolon(q) for q in queries]
queries_on_newlines = "\n".join(queries_with_semicolons)
queries_wrapped = f"""
BEGIN
BEGIN TRANSACTION;
{queries_on_newlines}
COMMIT TRANSACTION;
END;
"""
self.query(sql=queries_wrapped, parameters=parameters, return_values=False)
[docs]
def copy_from_gcs(
self,
gcs_blob_uri: str,
table_name: str,
if_exists: str = "fail",
max_errors: int = 0,
data_type: str = "csv",
csv_delimiter: str = ",",
ignoreheader: int = 1,
nullas: Optional[str] = None,
allow_quoted_newlines: bool = True,
allow_jagged_rows: bool = True,
quote: Optional[str] = None,
schema: Optional[List[dict]] = None,
job_config: Optional[LoadJobConfig] = None,
force_unzip_blobs: bool = False,
compression_type: str = "gzip",
new_file_extension: str = "csv",
template_table: Optional[str] = None,
**load_kwargs,
):
"""
Copy a csv saved in Google Cloud Storage into Google BigQuery.
`Args:`
gcs_blob_uri: str
The GoogleCloudStorage URI referencing the file to be copied.
table_name: str
The table name to load the data into. Will be used to generate load schema
if no custom schema or template table are supplied and the if_exists is
set to "truncate" or "append".
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table. This maps to `write_disposition` in the
`LoadJobConfig` class.
max_errors: int
The maximum number of rows that can error and be skipped before
the job fails. This maps to `max_bad_records` in the `LoadJobConfig` class.
data_type: str
Denotes whether target file is a JSON or CSV
csv_delimiter: str
Character used to separate values in the target file
ignoreheader: int
Treats the specified number_rows as a file header and doesn't load them
nullas: str
Loads fields that match null_string as NULL, where null_string can be any string
allow_quoted_newlines: bool
If True, detects quoted new line characters within a CSV field and does
not interpret the quoted new line character as a row boundary
allow_jagged_rows: bool
Allow missing trailing optional columns (CSV only).
quote: str
The value that is used to quote data sections in a CSV file.
BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of
the encoded string to split the data in its raw, binary state.
schema: list
BigQuery expects a list of dictionaries in the following format
```
schema = [
{"name": "column_name", "type": STRING},
{"name": "another_column_name", "type": INT}
]
```
job_config: object
A LoadJobConfig object to provide to the underlying call to load_table_from_uri
on the BigQuery client. The function will create its own if not provided. Note
if there are any conflicts between the job_config and other parameters, the
job_config values are preferred.
force_unzip_blobs: bool
If True, target blobs will be unzipped before being loaded to BigQuery.
compression_type: str
Accepts `zip` or `gzip` values to differentially unzip a compressed
blob in cloud storage.
new_file_extension: str
Provides a file extension if a blob is decompressed and rewritten
to cloud storage.
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
**load_kwargs: kwargs
Other arguments to pass to the underlying load_table_from_uri
call on the BigQuery client.
"""
self._validate_copy_inputs(if_exists=if_exists, data_type=data_type)
job_config = self._process_job_config(
job_config=job_config,
destination_table_name=table_name,
if_exists=if_exists,
max_errors=max_errors,
data_type=data_type,
csv_delimiter=csv_delimiter,
ignoreheader=ignoreheader,
nullas=nullas,
allow_quoted_newlines=allow_quoted_newlines,
allow_jagged_rows=allow_jagged_rows,
quote=quote,
custom_schema=schema,
template_table=template_table,
)
# load CSV from Cloud Storage into BigQuery
table_ref = self.get_table_ref(table_name=table_name)
try:
if force_unzip_blobs:
return self.copy_large_compressed_file_from_gcs(
gcs_blob_uri=gcs_blob_uri,
table_name=table_name,
if_exists=if_exists,
max_errors=max_errors,
data_type=data_type,
csv_delimiter=csv_delimiter,
ignoreheader=ignoreheader,
nullas=nullas,
allow_quoted_newlines=allow_quoted_newlines,
quote=quote,
schema=schema,
job_config=job_config,
compression_type=compression_type,
new_file_extension=new_file_extension,
)
else:
return self._load_table_from_uri(
source_uris=gcs_blob_uri,
destination=table_ref,
job_config=job_config,
**load_kwargs,
)
except exceptions.BadRequest as e:
if "one of the files is larger than the maximum allowed size." in str(e):
logger.debug(
f"{gcs_blob_uri.split('/')[-1]} exceeds max size ... \
running decompression function..."
)
return self.copy_large_compressed_file_from_gcs(
gcs_blob_uri=gcs_blob_uri,
table_name=table_name,
if_exists=if_exists,
max_errors=max_errors,
data_type=data_type,
csv_delimiter=csv_delimiter,
ignoreheader=ignoreheader,
nullas=nullas,
allow_quoted_newlines=allow_quoted_newlines,
quote=quote,
schema=schema,
job_config=job_config,
compression_type=compression_type,
new_file_extension=new_file_extension,
)
elif "Schema has no field" in str(e):
logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file")
return "Empty file"
else:
raise e
[docs]
def copy_large_compressed_file_from_gcs(
self,
gcs_blob_uri: str,
table_name: str,
if_exists: str = "fail",
max_errors: int = 0,
data_type: str = "csv",
csv_delimiter: str = ",",
ignoreheader: int = 1,
nullas: Optional[str] = None,
allow_quoted_newlines: bool = True,
allow_jagged_rows: bool = True,
quote: Optional[str] = None,
schema: Optional[List[dict]] = None,
job_config: Optional[LoadJobConfig] = None,
compression_type: str = "gzip",
new_file_extension: str = "csv",
template_table: Optional[str] = None,
**load_kwargs,
):
"""
Copy a compressed CSV file that exceeds the maximum size in Google Cloud Storage
into Google BigQuery.
`Args:`
gcs_blob_uri: str
The GoogleCloudStorage URI referencing the file to be copied.
table_name: str
The table name to load the data into. Will be used to generate load schema
if no custom schema or template table are supplied and the if_exists is
set to "truncate" or "append".
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table. This maps to `write_disposition` in the
`LoadJobConfig` class.
max_errors: int
The maximum number of rows that can error and be skipped before
the job fails. This maps to `max_bad_records` in the `LoadJobConfig` class.
data_type: str
Denotes whether target file is a JSON or CSV
csv_delimiter: str
Character used to separate values in the target file
ignoreheader: int
Treats the specified number_rows as a file header and doesn't load them
nullas: str
Loads fields that match null_string as NULL, where null_string can be any string
allow_quoted_newlines: bool
If True, detects quoted new line characters within a CSV field
and does not interpret the quoted new line character as a row boundary
allow_jagged_rows: bool
Allow missing trailing optional columns (CSV only).
quote: str
The value that is used to quote data sections in a CSV file.
BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of
the encoded string to split the data in its raw, binary state.
schema: list
BigQuery expects a list of dictionaries in the following format
```
schema = [
{"name": "column_name", "type": STRING},
{"name": "another_column_name", "type": INT}
]
```
job_config: object
A LoadJobConfig object to provide to the underlying call to load_table_from_uri
on the BigQuery client. The function will create its own if not provided. Note
if there are any conflicts between the job_config and other parameters, the
job_config values are preferred.
compression_type: str
Accepts `zip` or `gzip` values to differentially unzip a compressed
blob in cloud storage.
new_file_extension: str
Provides a file extension if a blob is decompressed and rewritten to cloud storage.
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
**load_kwargs: kwargs
Other arguments to pass to the underlying load_table_from_uri call on the BigQuery
client.
"""
self._validate_copy_inputs(if_exists=if_exists, data_type=data_type)
job_config = self._process_job_config(
job_config=job_config,
destination_table_name=table_name,
if_exists=if_exists,
max_errors=max_errors,
data_type=data_type,
csv_delimiter=csv_delimiter,
ignoreheader=ignoreheader,
nullas=nullas,
allow_quoted_newlines=allow_quoted_newlines,
allow_jagged_rows=allow_jagged_rows,
quote=quote,
custom_schema=schema,
template_table=template_table,
)
# TODO - See if this inheritance is happening in other places
gcs = GoogleCloudStorage(app_creds=self.app_creds, project=self.project)
old_bucket_name, old_blob_name = gcs.split_uri(gcs_uri=gcs_blob_uri)
uncompressed_gcs_uri = None
try:
logger.debug("Unzipping large file")
uncompressed_gcs_uri = gcs.unzip_blob(
bucket_name=old_bucket_name,
blob_name=old_blob_name,
new_file_extension=new_file_extension,
compression_type=compression_type,
)
logger.debug(f"Loading uncompressed uri into BigQuery {uncompressed_gcs_uri}...")
table_ref = self.get_table_ref(table_name=table_name)
return self._load_table_from_uri(
source_uris=uncompressed_gcs_uri,
destination=table_ref,
job_config=job_config,
**load_kwargs,
)
finally:
if uncompressed_gcs_uri:
new_bucket_name, new_blob_name = gcs.split_uri(gcs_uri=uncompressed_gcs_uri)
gcs.delete_blob(new_bucket_name, new_blob_name)
logger.debug("Successfully dropped uncompressed blob")
[docs]
def copy_s3(
self,
table_name,
bucket,
key,
if_exists: str = "fail",
max_errors: int = 0,
data_type: str = "csv",
csv_delimiter: str = ",",
ignoreheader: int = 1,
nullas: Optional[str] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
gcs_client: Optional[GoogleCloudStorage] = None,
tmp_gcs_bucket: Optional[str] = None,
template_table: Optional[str] = None,
job_config: Optional[LoadJobConfig] = None,
**load_kwargs,
):
"""
Copy a file from s3 to BigQuery.
`Args:`
table_name: str
The table name and schema (``tmc.cool_table``) to point the file.
bucket: str
The s3 bucket where the file or manifest is located.
key: str
The key of the file or manifest in the s3 bucket.
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table.
max_errors: int
The maximum number of rows that can error and be skipped before
the job fails.
data_type: str
The data type of the file. Only ``csv`` supported currently.
csv_delimiter: str
The delimiter of the ``csv``. Only relevant if data_type is ``csv``.
ignoreheader: int
The number of header rows to skip. Ignored if data_type is ``json``.
nullas: str
Loads fields that match string as NULL
aws_access_key_id:
An AWS access key granted to the bucket where the file is located. Not required
if keys are stored as environmental variables.
aws_secret_access_key:
An AWS secret access key granted to the bucket where the file is located. Not
required if keys are stored as environmental variables.
gcs_client: object
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
tmp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
job_config: object
A LoadJobConfig object to provide to the underlying call to load_table_from_uri
on the BigQuery client. The function will create its own if not provided. Note
if there are any conflicts between the job_config and other parameters, the
job_config values are preferred.
`Returns`
Parsons Table or ``None``
See :ref:`parsons-table` for output options.
"""
# copy from S3 to GCS
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
temp_blob_uri = gcs_client.copy_s3_to_gcs(
aws_source_bucket=bucket,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
gcs_sink_bucket=tmp_gcs_bucket,
aws_s3_key=key,
)
temp_blob_name = key
temp_blob_uri = gcs_client.format_uri(bucket=tmp_gcs_bucket, name=temp_blob_name)
# load CSV from Cloud Storage into BigQuery
try:
return self.copy_from_gcs(
gcs_blob_uri=temp_blob_uri,
table_name=table_name,
if_exists=if_exists,
max_errors=max_errors,
data_type=data_type,
csv_delimiter=csv_delimiter,
ignoreheader=ignoreheader,
nullas=nullas,
job_config=job_config,
template_table=template_table,
**load_kwargs,
)
finally:
gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
[docs]
def copy(
self,
tbl: Table,
table_name: str,
if_exists: str = "fail",
max_errors: int = 0,
tmp_gcs_bucket: Optional[str] = None,
gcs_client: Optional[GoogleCloudStorage] = None,
job_config: Optional[LoadJobConfig] = None,
template_table: Optional[str] = None,
ignoreheader: int = 1,
nullas: Optional[str] = None,
allow_quoted_newlines: bool = True,
allow_jagged_rows: bool = True,
quote: Optional[str] = None,
schema: Optional[List[dict]] = None,
**load_kwargs,
):
"""
Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage.
`Args:`
tbl: obj
The Parsons Table to copy into BigQuery.
table_name: str
The table name to load the data into. Will be used to generate load schema
if no custom schema or template table are supplied and if_exists is
set to "truncate" or "append".
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table.
max_errors: int
The maximum number of rows that can error and be skipped before
the job fails.
tmp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
gcs_client: object
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
job_config: object
A LoadJobConfig object to provide to the underlying call to load_table_from_uri
on the BigQuery client. The function will create its own if not provided.
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
**load_kwargs: kwargs
Arguments to pass to the underlying load_table_from_uri call on the BigQuery
client.
"""
data_type = "csv"
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
if not tmp_gcs_bucket:
raise ValueError(
"Must set GCS_TEMP_BUCKET environment variable or pass in tmp_gcs_bucket parameter"
)
self._validate_copy_inputs(if_exists=if_exists, data_type=data_type)
job_config = self._process_job_config(
job_config=job_config,
destination_table_name=table_name,
if_exists=if_exists,
max_errors=max_errors,
data_type=data_type,
template_table=template_table,
parsons_table=tbl,
ignoreheader=ignoreheader,
nullas=nullas,
allow_quoted_newlines=allow_quoted_newlines,
allow_jagged_rows=allow_jagged_rows,
quote=quote,
custom_schema=schema,
)
# Reorder schema to match table to ensure compatibility
schema = []
for column in tbl.columns:
try:
schema_row = [i for i in job_config.schema if i.name.lower() == column.lower()][0]
except IndexError:
raise IndexError(f"Column found in Table that was not found in schema: {column}")
schema.append(schema_row)
job_config.schema = schema
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
temp_blob_name = f"{uuid.uuid4()}.{data_type}"
temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name)
# load CSV from Cloud Storage into BigQuery
try:
self._load_table_from_uri(
source_uris=temp_blob_uri,
destination=self.get_table_ref(table_name=table_name),
job_config=job_config,
**load_kwargs,
)
finally:
gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
[docs]
def duplicate_table(
self,
source_table,
destination_table,
if_exists="fail",
drop_source_table=False,
):
"""
Create a copy of an existing table (or subset of rows) in a new
table.
`Args:`
source_table: str
Name of existing schema and table (e.g. ``myschema.oldtable``)
destination_table: str
Name of destination schema and table (e.g. ``myschema.newtable``)
if_exists: str
If the table already exists, either ``fail``, ``replace``, or
``ignore`` the operation.
drop_source_table: boolean
Drop the source table
"""
if if_exists not in ["fail", "replace", "ignore"]:
raise ValueError("Invalid value for `if_exists` argument")
if if_exists == "fail" and self.table_exists(destination_table):
raise ValueError("Table already exists.")
table__replace_clause = "OR REPLACE " if if_exists == "replace" else ""
table__exists_clause = " IF NOT EXISTS" if if_exists == "ignore" else ""
query = f"""
CREATE {table__replace_clause}TABLE{table__exists_clause}
{destination_table}
CLONE {source_table}
"""
self.query(sql=query, return_values=False)
if drop_source_table:
self.delete_table(table_name=source_table)
[docs]
def upsert(
self,
table_obj,
target_table,
primary_key,
distinct_check=True,
cleanup_temp_table=True,
from_s3=False,
**copy_args,
):
"""
Preform an upsert on an existing table. An upsert is a function in which rows
in a table are updated and inserted at the same time.
`Args:`
table_obj: obj
A Parsons table object
target_table: str
The schema and table name to upsert
primary_key: str or list
The primary key column(s) of the target table
distinct_check: boolean
Check if the primary key column is distinct. Raise error if not.
cleanup_temp_table: boolean
A temp table is dropped by default on cleanup. You can set to False for debugging.
from_s3: boolean
Instead of specifying a table_obj (set the first argument to None),
set this to True and include :func:`~parsons.databases.bigquery.Bigquery.copy_s3`
arguments to upsert a pre-existing s3 file into the target_table
\**copy_args: kwargs
See :func:`~parsons.databases.bigquery.BigQuery.copy` for options.
""" # noqa: W605
if not self.table_exists(target_table):
logger.info(
"Target table does not exist. Copying into newly \
created target table."
)
self.copy(table_obj, target_table)
return None
if isinstance(primary_key, str):
primary_keys = [primary_key]
else:
primary_keys = primary_key
if distinct_check:
primary_keys_statement = ", ".join(primary_keys)
diff = self.query(
f"""
select (
select count(*)
from {target_table}
) - (
SELECT COUNT(*) from (
select distinct {primary_keys_statement}
from {target_table}
)
) as total_count
"""
).first
if diff > 0:
raise ValueError("Primary key column contains duplicate values.")
noise = f"{random.randrange(0, 10000):04}"[:4]
date_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M")
# Generate a temp table like "table_tmp_20200210_1230_14212"
staging_tbl = f"{target_table}_stg_{date_stamp}_{noise}"
# Copy to a staging table
logger.info(f"Building staging table: {staging_tbl}")
if from_s3:
if table_obj is not None:
raise ValueError(
"upsert(... from_s3=True) requires the first argument (table_obj)"
" to be None. from_s3 and table_obj are mutually exclusive."
)
self.copy_s3(staging_tbl, template_table=target_table, **copy_args)
else:
self.copy(
tbl=table_obj,
table_name=staging_tbl,
template_table=target_table,
**copy_args,
)
# Delete rows
comparisons = [
f"`{staging_tbl}`.{primary_key} = `{target_table}`.{primary_key}"
for primary_key in primary_keys
]
where_clause = " and ".join(comparisons)
queries = [
f"""
DELETE FROM `{target_table}`
WHERE EXISTS
(SELECT * FROM `{staging_tbl}`
WHERE {where_clause})
""",
f"""
INSERT INTO `{target_table}`
SELECT * FROM `{staging_tbl}`
""",
]
try:
return self.query_with_transaction(queries=queries)
finally:
if cleanup_temp_table:
logger.info(f"Deleting staging table: {staging_tbl}")
self.query(f"DROP TABLE IF EXISTS {staging_tbl}", return_values=False)
[docs]
def delete_table(self, table_name):
"""
Delete a BigQuery table.
`Args:`
table_name: str
The name of the table to delete.
"""
table_ref = self.get_table_ref(table_name=table_name)
self.client.delete_table(table_ref)
[docs]
def table_exists(self, table_name: str) -> bool:
"""
Check whether or not the Google BigQuery table exists in the specified dataset.
`Args:`
table_name: str
The name of the BigQuery table to check for
`Returns:`
bool
True if the table exists in the specified dataset, false otherwise
"""
table_ref = self.get_table_ref(table_name=table_name)
try:
self.client.get_table(table_ref)
except exceptions.NotFound:
return False
return True
[docs]
def get_tables(self, schema, table_name: Optional[str] = None):
"""
List the tables in a schema including metadata.
Args:
schema: str
Filter by a schema
table_name: str
Filter by a table name
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
logger.debug("Retrieving tables info.")
sql = f"select * from {schema}.INFORMATION_SCHEMA.TABLES"
if table_name:
sql += f" where table_name = '{table_name}'"
return self.query(sql)
[docs]
def get_views(self, schema, view: Optional[str] = None):
"""
List views.
Args:
schema: str
Filter by a schema
view: str
Filter by a table name
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
logger.debug("Retrieving views info.")
sql = f"""
select
table_schema as schema_name,
table_name as view_name,
view_definition
from {schema}.INFORMATION_SCHEMA.VIEWS
"""
if view:
sql += f" where table_name = '{view}'"
return self.query(sql)
[docs]
def get_columns(self, schema: str, table_name: str):
"""
Gets the column names (and other column metadata) for a table. If you
need just the column names run ``get_columns_list()``, as it is faster.
`Args:`
schema: str
The schema name
table_name: str
The table name
`Returns:`
A dictionary mapping column name to a dictionary with extra info. The
keys of the dictionary are ordered just liked the columns in the table.
The extra info is a dict with format
"""
base_query = f"""
SELECT
*
FROM `{schema}.INFORMATION_SCHEMA.COLUMNS`
WHERE
table_name = '{table_name}'
"""
logger.debug(base_query)
return {
row["column_name"]: {
"data_type": row["data_type"],
"is_nullable": row["is_nullable"],
"is_updatable": row["is_updatable"],
"is_partioning_column": row["is_partitioning_column"],
"rounding_mode": row["rounding_mode"],
}
for row in self.query(base_query)
}
[docs]
def get_columns_list(self, schema: str, table_name: str) -> list:
"""
Gets the column names for a table.
`Args:`
schema: str
The schema name
table_name: str
The table name
`Returns:`
A list of column names
"""
first_row = self.query(f"SELECT * FROM {schema}.{table_name} LIMIT 1;")
return [x for x in first_row.columns]
[docs]
def get_row_count(self, schema: str, table_name: str) -> int:
"""
Gets the row count for a BigQuery materialization.
Caution: This method uses SELECT COUNT(*) which can be expensive for large tables,
especially those with many columns. This is because BigQuery scans all table data
to perform the count, even though only the row count is returned.
`Args`:
schema: str
The schema name
table_name: str
The table name
`Returns:`
Row count of the target table
"""
sql = f"SELECT COUNT(*) AS row_count FROM `{schema}.{table_name}`"
result = self.query(sql=sql)
return result["row_count"][0]
def get_table_ref(self, table_name):
# Helper function to build a TableReference for our table
parsed = parse_table_name(table_name)
dataset_ref = self.client.dataset(parsed["dataset"])
return dataset_ref.table(parsed["table"])
def _get_job_config_schema(
self,
job_config: LoadJobConfig,
destination_table_name: str,
if_exists: str,
parsons_table: Optional[Table] = None,
custom_schema: Optional[list] = None,
template_table: Optional[str] = None,
) -> Optional[List[bigquery.SchemaField]]:
# if job.schema already set in job_config, do nothing
if job_config.schema:
return job_config.schema
# if schema specified by user, convert to schema type and use that
if custom_schema:
return map_column_headers_to_schema_field(custom_schema)
# if template_table specified by user, use that
# otherwise, if loading into existing table, infer destination table as template table
if not template_table and if_exists in ("append", "truncate"):
template_table = destination_table_name
# if template_table set, use it to set the load schema
if template_table:
try:
bigquery_table = self.client.get_table(template_table)
return bigquery_table.schema
except google.api_core.exceptions.NotFound:
logger.warning(
f"template_table '{template_table}' not found. Unable to set schema."
)
# if load is coming from a Parsons table, use that to generate schema
if parsons_table:
return self._generate_schema_from_parsons_table(parsons_table)
return None
def _generate_schema_from_parsons_table(self, tbl):
"""BigQuery schema generation based on contents of Parsons table.
Not usually necessary to use this. BigQuery is able to
natively autodetect schema formats."""
stats = tbl.get_columns_type_stats()
fields = []
for stat in stats:
petl_types = stat["type"]
# Prefer 'str' if included
# Otherwise choose first type that isn't "NoneType"
# Otherwise choose NoneType
not_none_petl_types = [i for i in petl_types if i != "NoneType"]
if "str" in petl_types:
best_type = "str"
elif not_none_petl_types:
best_type = not_none_petl_types[0]
else:
best_type = "NoneType"
# Python datetimes may be datetime or timestamp in BigQuery
# BigQuery datetimes have no timezone, timestamps do
if best_type == "datetime":
for value in petl.util.base.values(tbl.table, stat["name"]):
if isinstance(value, datetime.datetime) and value.tzinfo:
best_type = "timestamp"
try:
field_type = self._bigquery_type(best_type)
except KeyError as e:
raise KeyError(
"Column type not supported for load to BigQuery. "
"Consider converting to another type. "
f"[type={best_type}]"
) from e
field = bigquery.schema.SchemaField(stat["name"], field_type)
fields.append(field)
return fields
def _process_job_config(
self,
destination_table_name: str,
if_exists: str,
max_errors: int,
data_type: str,
csv_delimiter: Optional[str] = ",",
ignoreheader: Optional[int] = 1,
nullas: Optional[str] = None,
allow_quoted_newlines: Optional[bool] = None,
allow_jagged_rows: Optional[bool] = None,
quote: Optional[str] = None,
job_config: Optional[LoadJobConfig] = None,
custom_schema: Optional[list] = None,
template_table: Optional[str] = None,
parsons_table: Optional[Table] = None,
) -> LoadJobConfig:
"""
Internal function to neatly process a user-supplied job configuration object.
As a convention, if both the job_config and keyword arguments specify a value,
we defer to the job_config.
`Args`:
job_config: `LoadJobConfig`
Optionally supplied GCS `LoadJobConfig` object
`Returns`:
A `LoadJobConfig` object
"""
if not job_config:
job_config = bigquery.LoadJobConfig()
job_config.schema = self._get_job_config_schema(
job_config=job_config,
destination_table_name=destination_table_name,
if_exists=if_exists,
parsons_table=parsons_table,
custom_schema=custom_schema,
template_table=template_table,
)
if not job_config.schema:
job_config.autodetect = True
if not job_config.create_disposition:
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
if not job_config.max_bad_records:
job_config.max_bad_records = max_errors
if not job_config.skip_leading_rows and data_type == "csv":
job_config.skip_leading_rows = ignoreheader
if not job_config.source_format:
job_config.source_format = (
bigquery.SourceFormat.CSV
if data_type == "csv"
else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)
if not job_config.field_delimiter:
if data_type == "csv":
job_config.field_delimiter = csv_delimiter
if nullas:
job_config.null_marker = nullas
destination_table_exists = self.table_exists(destination_table_name)
if not job_config.write_disposition:
if if_exists == "append":
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
elif if_exists == "truncate":
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
elif destination_table_exists and if_exists == "fail":
raise Exception("Table already exists.")
elif if_exists == "drop" and destination_table_exists:
self.delete_table(destination_table_name)
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
else:
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
if not job_config.allow_quoted_newlines and allow_quoted_newlines is not None:
job_config.allow_quoted_newlines = allow_quoted_newlines
if data_type == "csv" and allow_jagged_rows is not None:
job_config.allow_jagged_rows = allow_jagged_rows
if not job_config.quote_character and quote is not None:
job_config.quote_character = quote
return job_config
def _fetch_query_results(self, cursor) -> Table:
# We will use a temp file to cache the results so that they are not all living
# in memory. We'll use pickle to serialize the results to file in order to maintain
# the proper data types (e.g. integer).
temp_filename = create_temp_file()
with open(temp_filename, "wb") as temp_file:
header = [i[0] for i in cursor.description]
pickle.dump(header, temp_file)
while True:
batch = cursor.fetchmany(QUERY_BATCH_SIZE)
if len(batch) == 0:
break
for row in batch:
row_data = list(row.values())
pickle.dump(row_data, temp_file)
ptable = petl.frompickle(temp_filename)
return Table(ptable)
def _validate_copy_inputs(self, if_exists: str, data_type: str):
if if_exists not in ["fail", "truncate", "append", "drop"]:
raise ValueError(
f"Unexpected value for if_exists: {if_exists}, must be one of "
'"append", "drop", "truncate", or "fail"'
)
if data_type not in ["csv", "json"]:
raise ValueError(f"Only supports csv or json files [data_type = {data_type}]")
def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwargs):
load_job = self.client.load_table_from_uri(
source_uris=source_uris,
destination=destination,
job_config=job_config,
**load_kwargs,
)
try:
load_job.result()
return load_job
except exceptions.BadRequest as e:
for idx, error_ in enumerate(load_job.errors):
if idx == 0:
logger.error("* Load job failed. Enumerating errors collection below:")
logger.error(f"** Error collection - index {idx}:")
logger.error(error_)
raise e
@staticmethod
def _bigquery_type(tp):
return BIGQUERY_TYPE_MAP[tp]
def table(self, table_name):
# Return a MySQL table object
return BigQueryTable(self, table_name)
def extract(
self,
dataset: str,
table_name: str,
gcs_bucket: str,
gcs_blob_name: str,
project: Optional[str] = None,
) -> None:
dataset_ref = bigquery.DatasetReference(project or self.client.project, dataset)
table_ref = dataset_ref.table(table_name)
gs_destination = f"gs://{gcs_bucket}/{gcs_blob_name}"
extract_job = self.client.extract_table(
table_ref,
gs_destination,
)
extract_job.result() # Waits for job to complete.
logger.info(f"Finished exporting query result to {gs_destination}.")
class BigQueryTable(BaseTable):
"""BigQuery table object."""
def drop(self, cascade=False):
"""
Drop the table.
"""
self.db.delete_table(self.table)
def truncate(self):
"""
Truncate the table.
"""
self.db.query(f"TRUNCATE TABLE {self.table}")