import pickle
from typing import Optional, Union
import uuid
from google.cloud import bigquery
from google.cloud.bigquery import dbapi
from google.cloud.bigquery.job import LoadJobConfig
from google.cloud import exceptions
import petl
from parsons.databases.table import BaseTable
from parsons.databases.database_connector import DatabaseConnector
from parsons.etl import Table
from parsons.google.utitities import setup_google_application_credentials
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file
BIGQUERY_TYPE_MAP = {
"str": "STRING",
"float": "FLOAT",
"int": "INTEGER",
"bool": "BOOLEAN",
"datetime.datetime": "DATETIME",
"datetime.date": "DATE",
"datetime.time": "TIME",
"dict": "RECORD",
"NoneType": "STRING",
}
# Max number of rows that we query at a time, so we can avoid loading huge
# data sets into memory.
# 100k rows per batch at ~1k bytes each = ~100MB per batch.
QUERY_BATCH_SIZE = 100000
def get_table_ref(client, table_name):
# Helper function to build a TableReference for our table
parsed = parse_table_name(table_name)
dataset_ref = client.dataset(parsed["dataset"])
return dataset_ref.table(parsed["table"])
def parse_table_name(table_name):
# Helper function to parse out the different components of a table ID
parts = table_name.split(".")
parts.reverse()
parsed = {
"project": None,
"dataset": None,
"table": None,
}
if len(parts) > 0:
parsed["table"] = parts[0]
if len(parts) > 1:
parsed["dataset"] = parts[1]
if len(parts) > 2:
parsed["project"] = parts[2]
return parsed
[docs]class GoogleBigQuery(DatabaseConnector):
"""
Class for querying BigQuery table and returning the data as Parsons tables.
This class requires application credentials in the form of a json. It can be passed
in the following ways:
* Set an environmental variable named ``GOOGLE_APPLICATION_CREDENTIALS`` with the
local path to the credentials json.
Example: ``GOOGLE_APPLICATION_CREDENTALS='path/to/creds.json'``
* Pass in the path to the credentials using the ``app_creds`` argument.
* Pass in a json string using the ``app_creds`` argument.
Args:
app_creds: str
A credentials json string or a path to a json file. Not required
if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
project: str
The project which the client is acting on behalf of. If not passed
then will use the default inferred environment.
location: str
Default geographic location for tables
"""
def __init__(self, app_creds=None, project=None, location=None):
self.app_creds = app_creds
setup_google_application_credentials(app_creds)
self.project = project
self.location = location
# We will not create the client until we need to use it, since creating the client
# without valid GOOGLE_APPLICATION_CREDENTIALS raises an exception.
# This attribute will be used to hold the client once we have created it.
self._client = None
self._dbapi = dbapi
self.dialect = "bigquery"
[docs] def copy(
self,
tbl: Table,
table_name: str,
if_exists: str = "fail",
tmp_gcs_bucket: Optional[str] = None,
gcs_client: Optional[GoogleCloudStorage] = None,
job_config: Optional[LoadJobConfig] = None,
**load_kwargs,
):
"""
Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage.
`Args:`
table_obj: obj
The Parsons Table to copy into BigQuery.
table_name: str
The table name to load the data into.
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table.
tmp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
gcs_client: object
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
job_config: object
A LoadJobConfig object to provide to the underlying call to load_table_from_uri
on the BigQuery client. The function will create its own if not provided.
**load_kwargs: kwargs
Arguments to pass to the underlying load_table_from_uri call on the BigQuery
client.
"""
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
if if_exists not in ["fail", "truncate", "append", "drop"]:
raise ValueError(
f"Unexpected value for if_exists: {if_exists}, must be one of "
'"append", "drop", "truncate", or "fail"'
)
table_exists = self.table_exists(table_name)
if not job_config:
job_config = bigquery.LoadJobConfig()
if not job_config.schema:
job_config.schema = self._generate_schema(tbl)
if not job_config.create_disposition:
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
if table_exists:
if if_exists == "fail":
raise ValueError("Table already exists.")
elif if_exists == "drop":
self.delete_table(table_name)
elif if_exists == "append":
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
elif if_exists == "truncate":
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
gcs_client = gcs_client or GoogleCloudStorage()
temp_blob_name = f"{uuid.uuid4()}.csv"
temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name)
# load CSV from Cloud Storage into BigQuery
table_ref = get_table_ref(self.client, table_name)
try:
load_job = self.client.load_table_from_uri(
temp_blob_uri,
table_ref,
job_config=job_config,
**load_kwargs,
)
load_job.result()
finally:
gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
[docs] def delete_table(self, table_name):
"""
Delete a BigQuery table.
`Args:`
table_name: str
The name of the table to delete.
"""
table_ref = get_table_ref(self.client, table_name)
self.client.delete_table(table_ref)
[docs] def query(
self, sql: str, parameters: Optional[Union[list, dict]] = None
) -> Optional[Table]:
"""
Run a BigQuery query and return the results as a Parsons table.
To include python variables in your query, it is recommended to pass them as parameters,
following the BigQuery style where parameters are prefixed with `@`s.
Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL
injection attacks.
**Parameter Examples**
.. code-block:: python
name = "Beatrice O'Brady"
sql = 'SELECT * FROM my_table WHERE name = %s'
rs.query(sql, parameters=[name])
.. code-block:: python
name = "Beatrice O'Brady"
sql = "SELECT * FROM my_table WHERE name = %(name)s"
rs.query(sql, parameters={'name': name})
`Args:`
sql: str
A valid BigTable statement
parameters: dict
A dictionary of query parameters for BigQuery.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
# get our connection and cursor
cursor = self._dbapi.connect(self.client).cursor()
# Run the query
cursor.execute(sql, parameters)
# We will use a temp file to cache the results so that they are not all living
# in memory. We'll use pickle to serialize the results to file in order to maintain
# the proper data types (e.g. integer).
temp_filename = create_temp_file()
wrote_header = False
with open(temp_filename, "wb") as temp_file:
# Track whether we got data, since if we don't get any results we need to return None
got_results = False
while True:
batch = cursor.fetchmany(QUERY_BATCH_SIZE)
if len(batch) == 0:
break
got_results = True
for row in batch:
# Make sure we write out the header once and only once
if not wrote_header:
wrote_header = True
header = list(row.keys())
pickle.dump(header, temp_file)
row_data = list(row.values())
pickle.dump(row_data, temp_file)
if not got_results:
return None
ptable = petl.frompickle(temp_filename)
final_table = Table(ptable)
return final_table
[docs] def table_exists(self, table_name: str) -> bool:
"""
Check whether or not the Google BigQuery table exists in the specified dataset.
`Args:`
table_name: str
The name of the BigQuery table to check for
`Returns:`
bool
True if the table exists in the specified dataset, false otherwise
"""
table_ref = get_table_ref(self.client, table_name)
try:
self.client.get_table(table_ref)
except exceptions.NotFound:
return False
return True
@property
def client(self):
"""
Get the Google BigQuery client to use for making queries.
`Returns:`
`google.cloud.bigquery.client.Client`
"""
if not self._client:
# Create a BigQuery client to use to make the query
self._client = bigquery.Client(project=self.project, location=self.location)
return self._client
def _generate_schema(self, tbl):
stats = tbl.get_columns_type_stats()
fields = []
for stat in stats:
petl_types = stat["type"]
best_type = "str" if "str" in petl_types else petl_types[0]
field_type = self._bigquery_type(best_type)
field = bigquery.schema.SchemaField(stat["name"], field_type)
fields.append(field)
return fields
@staticmethod
def _bigquery_type(tp):
return BIGQUERY_TYPE_MAP[tp]
def table(self, table_name):
# Return a MySQL table object
return BigQueryTable(self, table_name)
class BigQueryTable(BaseTable):
# BigQuery table object.
def drop(self, cascade=False):
"""
Drop the table.
"""
self.db.delete_table(self.table)
def truncate(self):
"""
Truncate the table.
"""
# BigQuery does not support truncate natively, so we will "load" an empty dataset
# with write disposition of "truncate"
table_ref = get_table_ref(self.db.client, self.table)
bq_table = self.db.client.get_table(table_ref)
# BigQuery wants the schema when we load the data, so we will grab it from the table
job_config = bigquery.LoadJobConfig()
job_config.schema = bq_table.schema
empty_table = Table([])
self.db.copy(
empty_table, self.table, if_exists="truncate", job_config=job_config
)