Source code for parsons.google.google_bigquery

import datetime
import logging
import pickle
import random
import uuid
from contextlib import contextmanager
from typing import List, Optional, Union

import google
import petl
from google.cloud import bigquery, exceptions
from google.cloud.bigquery import dbapi
from google.cloud.bigquery.job import LoadJobConfig

from parsons.databases.database_connector import DatabaseConnector
from parsons.databases.table import BaseTable
from parsons.etl import Table
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.google.utitities import setup_google_application_credentials
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file

logger = logging.getLogger(__name__)

BIGQUERY_TYPE_MAP = {
    "str": "STRING",
    "float": "FLOAT",
    "int": "INTEGER",
    "bool": "BOOLEAN",
    "datetime": "DATETIME",
    "date": "DATE",
    "time": "TIME",
    "dict": "RECORD",
    "NoneType": "STRING",
    "UUID": "STRING",
    "timestamp": "TIMESTAMP",
    "Decimal": "FLOAT",
}

# Max number of rows that we query at a time, so we can avoid loading huge
# data sets into memory.
# 100k rows per batch at ~1k bytes each = ~100MB per batch.
QUERY_BATCH_SIZE = 100000


def parse_table_name(table_name):
    # Helper function to parse out the different components of a table ID
    parts = table_name.split(".")
    parts.reverse()
    parsed = {
        "project": None,
        "dataset": None,
        "table": None,
    }
    if len(parts) > 0:
        parsed["table"] = parts[0]
    if len(parts) > 1:
        parsed["dataset"] = parts[1]
    if len(parts) > 2:
        parsed["project"] = parts[2]
    return parsed


def ends_with_semicolon(query: str) -> str:
    query = query.strip()
    if query[-1] == ";":
        return query
    return query + ";"


def map_column_headers_to_schema_field(schema_definition: list) -> list:
    """
    Loops through a list of dictionaries and instantiates
    google.cloud.bigquery.SchemaField objects. Useful docs
    from Google's API can be found here:
        https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.schema.SchemaField

    `Args`:
        schema_definition: list
        This function expects a list of dictionaries in the following format:

        ```
        schema_definition = [
            {
                "name": column_name,
                "field_type": [INTEGER, STRING, FLOAT, etc.]
            },
            {
                "name": column_name,
                "field_type": [INTEGER, STRING, FLOAT, etc.],
                "mode": "REQUIRED"
            },
            {
                "name": column_name,
                "field_type": [INTEGER, STRING, FLOAT, etc.],
                "default_value_expression": CURRENT_TIMESTAMP()
            }
        ]
        ```

    `Returns`:
        List of instantiated `SchemaField` objects
    """

    # TODO - Better way to test for this
    if isinstance(schema_definition[0], bigquery.SchemaField):
        logger.debug("User supplied list of SchemaField objects")
        return schema_definition

    return [bigquery.SchemaField(**x) for x in schema_definition]


[docs]class GoogleBigQuery(DatabaseConnector): """ Class for querying BigQuery table and returning the data as Parsons tables. This class requires application credentials in the form of a json. It can be passed in the following ways: * Set an environmental variable named ``GOOGLE_APPLICATION_CREDENTIALS`` with the local path to the credentials json. Example: ``GOOGLE_APPLICATION_CREDENTALS='path/to/creds.json'`` * Pass in the path to the credentials using the ``app_creds`` argument. * Pass in a json string using the ``app_creds`` argument. Args: app_creds: str A credentials json string or a path to a json file. Not required if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set. project: str The project which the client is acting on behalf of. If not passed then will use the default inferred environment. location: str Default geographic location for tables client_options: dict A dictionary containing any requested client options. Defaults to the required scopes for making API calls against External tables stored in Google Drive. Can be set to None if these permissions are not desired """ def __init__( self, app_creds=None, project=None, location=None, client_options: dict = { "scopes": [ "https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/bigquery", "https://www.googleapis.com/auth/cloud-platform", ] }, ): self.app_creds = app_creds setup_google_application_credentials(app_creds) self.project = project self.location = location self.client_options = client_options # We will not create the client until we need to use it, since creating the client # without valid GOOGLE_APPLICATION_CREDENTIALS raises an exception. # This attribute will be used to hold the client once we have created it. self._client = None self._dbapi = dbapi self.dialect = "bigquery" @property def client(self): """ Get the Google BigQuery client to use for making queries. `Returns:` `google.cloud.bigquery.client.Client` """ if not self._client: # Create a BigQuery client to use to make the query self._client = bigquery.Client( project=self.project, location=self.location, client_options=self.client_options, ) return self._client
[docs] @contextmanager def connection(self): """ Generate a BigQuery connection. The connection is set up as a python "context manager", so it will be closed automatically when the connection goes out of scope. Note that the BigQuery API uses jobs to run database operations and, as such, simply has a no-op for a "commit" function. If you would like to manage transactions, please use multi-statement queries as [outlined here](https://cloud.google.com/bigquery/docs/transactions) or utilize the `query_with_transaction` method on this class. When using the connection, make sure to put it in a ``with`` block (necessary for any context manager): ``with bq.connection() as conn:`` `Returns:` Google BigQuery ``connection`` object """ conn = self._dbapi.connect(self.client) try: yield conn finally: conn.close()
@contextmanager def cursor(self, connection): cur = connection.cursor() try: yield cur finally: cur.close()
[docs] def query( self, sql: str, parameters: Optional[Union[list, dict]] = None, return_values: bool = True, ) -> Optional[Table]: """ Run a BigQuery query and return the results as a Parsons table. To include python variables in your query, it is recommended to pass them as parameters, following the BigQuery style where parameters are prefixed with `@`s. Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL injection attacks. **Parameter Examples** .. code-block:: python name = "Beatrice O'Brady" sql = 'SELECT * FROM my_table WHERE name = %s' rs.query(sql, parameters=[name]) .. code-block:: python name = "Beatrice O'Brady" sql = "SELECT * FROM my_table WHERE name = %(name)s" rs.query(sql, parameters={'name': name}) `Args:` sql: str A valid BigTable statement parameters: dict A dictionary of query parameters for BigQuery. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ with self.connection() as connection: return self.query_with_connection( sql, connection, parameters=parameters, return_values=return_values )
[docs] def query_with_connection( self, sql, connection, parameters=None, commit=True, return_values: bool = True ): """ Execute a query against the BigQuery database, with an existing connection. Useful for batching queries together. Will return ``None`` if the query returns zero rows. `Args:` sql: str A valid SQL statement connection: obj A connection object obtained from ``redshift.connection()`` parameters: list A list of python variables to be converted into SQL values in your query commit: boolean Must be true. BigQuery `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ if not commit: raise ValueError( """ BigQuery implementation uses an API client which always auto-commits. If you wish to wrap multiple queries in a transaction, use Mulit-Statement transactions within a single query as outlined here: https://cloud.google.com/bigquery/docs/transactions or use the `query_with_transaction` method on the BigQuery connector. """ ) # get our connection and cursor with self.cursor(connection) as cursor: # Run the query cursor.execute(sql, parameters) if not return_values: return None # This applies when running a SQL statement without any return value # e.g. when creating a view or a table # This does not apply when 0 rows are returned if not cursor.description: return None final_table = self._fetch_query_results(cursor=cursor) return final_table
def query_with_transaction(self, queries, parameters=None): queries_with_semicolons = [ends_with_semicolon(q) for q in queries] queries_on_newlines = "\n".join(queries_with_semicolons) queries_wrapped = f""" BEGIN BEGIN TRANSACTION; {queries_on_newlines} COMMIT TRANSACTION; END; """ self.query(sql=queries_wrapped, parameters=parameters, return_values=False)
[docs] def copy_from_gcs( self, gcs_blob_uri: str, table_name: str, if_exists: str = "fail", max_errors: int = 0, data_type: str = "csv", csv_delimiter: str = ",", ignoreheader: int = 1, nullas: Optional[str] = None, allow_quoted_newlines: bool = True, allow_jagged_rows: bool = True, quote: Optional[str] = None, schema: Optional[List[dict]] = None, job_config: Optional[LoadJobConfig] = None, force_unzip_blobs: bool = False, compression_type: str = "gzip", new_file_extension: str = "csv", template_table: Optional[str] = None, **load_kwargs, ): """ Copy a csv saved in Google Cloud Storage into Google BigQuery. `Args:` gcs_blob_uri: str The GoogleCloudStorage URI referencing the file to be copied. table_name: str The table name to load the data into. Will be used to generate load schema if no custom schema or template table are supplied and the if_exists is set to "truncate" or "append". if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. This maps to `write_disposition` in the `LoadJobConfig` class. max_errors: int The maximum number of rows that can error and be skipped before the job fails. This maps to `max_bad_records` in the `LoadJobConfig` class. data_type: str Denotes whether target file is a JSON or CSV csv_delimiter: str Character used to separate values in the target file ignoreheader: int Treats the specified number_rows as a file header and doesn't load them nullas: str Loads fields that match null_string as NULL, where null_string can be any string allow_quoted_newlines: bool If True, detects quoted new line characters within a CSV field and does not interpret the quoted new line character as a row boundary allow_jagged_rows: bool Allow missing trailing optional columns (CSV only). quote: str The value that is used to quote data sections in a CSV file. BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its raw, binary state. schema: list BigQuery expects a list of dictionaries in the following format ``` schema = [ {"name": "column_name", "type": STRING}, {"name": "another_column_name", "type": INT} ] ``` job_config: object A LoadJobConfig object to provide to the underlying call to load_table_from_uri on the BigQuery client. The function will create its own if not provided. Note if there are any conflicts between the job_config and other parameters, the job_config values are preferred. force_unzip_blobs: bool If True, target blobs will be unzipped before being loaded to BigQuery. compression_type: str Accepts `zip` or `gzip` values to differentially unzip a compressed blob in cloud storage. new_file_extension: str Provides a file extension if a blob is decompressed and rewritten to cloud storage. template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ self._validate_copy_inputs(if_exists=if_exists, data_type=data_type) job_config = self._process_job_config( job_config=job_config, destination_table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, csv_delimiter=csv_delimiter, ignoreheader=ignoreheader, nullas=nullas, allow_quoted_newlines=allow_quoted_newlines, allow_jagged_rows=allow_jagged_rows, quote=quote, custom_schema=schema, template_table=template_table, ) # load CSV from Cloud Storage into BigQuery table_ref = self.get_table_ref(table_name=table_name) try: if force_unzip_blobs: return self.copy_large_compressed_file_from_gcs( gcs_blob_uri=gcs_blob_uri, table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, csv_delimiter=csv_delimiter, ignoreheader=ignoreheader, nullas=nullas, allow_quoted_newlines=allow_quoted_newlines, quote=quote, schema=schema, job_config=job_config, compression_type=compression_type, new_file_extension=new_file_extension, ) else: return self._load_table_from_uri( source_uris=gcs_blob_uri, destination=table_ref, job_config=job_config, **load_kwargs, ) except exceptions.BadRequest as e: if "one of the files is larger than the maximum allowed size." in str(e): logger.debug( f"{gcs_blob_uri.split('/')[-1]} exceeds max size ... \ running decompression function..." ) return self.copy_large_compressed_file_from_gcs( gcs_blob_uri=gcs_blob_uri, table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, csv_delimiter=csv_delimiter, ignoreheader=ignoreheader, nullas=nullas, allow_quoted_newlines=allow_quoted_newlines, quote=quote, schema=schema, job_config=job_config, compression_type=compression_type, new_file_extension=new_file_extension, ) elif "Schema has no field" in str(e): logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file") return "Empty file" else: raise e
[docs] def copy_large_compressed_file_from_gcs( self, gcs_blob_uri: str, table_name: str, if_exists: str = "fail", max_errors: int = 0, data_type: str = "csv", csv_delimiter: str = ",", ignoreheader: int = 1, nullas: Optional[str] = None, allow_quoted_newlines: bool = True, allow_jagged_rows: bool = True, quote: Optional[str] = None, schema: Optional[List[dict]] = None, job_config: Optional[LoadJobConfig] = None, compression_type: str = "gzip", new_file_extension: str = "csv", template_table: Optional[str] = None, **load_kwargs, ): """ Copy a compressed CSV file that exceeds the maximum size in Google Cloud Storage into Google BigQuery. `Args:` gcs_blob_uri: str The GoogleCloudStorage URI referencing the file to be copied. table_name: str The table name to load the data into. Will be used to generate load schema if no custom schema or template table are supplied and the if_exists is set to "truncate" or "append". if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. This maps to `write_disposition` in the `LoadJobConfig` class. max_errors: int The maximum number of rows that can error and be skipped before the job fails. This maps to `max_bad_records` in the `LoadJobConfig` class. data_type: str Denotes whether target file is a JSON or CSV csv_delimiter: str Character used to separate values in the target file ignoreheader: int Treats the specified number_rows as a file header and doesn't load them nullas: str Loads fields that match null_string as NULL, where null_string can be any string allow_quoted_newlines: bool If True, detects quoted new line characters within a CSV field and does not interpret the quoted new line character as a row boundary allow_jagged_rows: bool Allow missing trailing optional columns (CSV only). quote: str The value that is used to quote data sections in a CSV file. BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its raw, binary state. schema: list BigQuery expects a list of dictionaries in the following format ``` schema = [ {"name": "column_name", "type": STRING}, {"name": "another_column_name", "type": INT} ] ``` job_config: object A LoadJobConfig object to provide to the underlying call to load_table_from_uri on the BigQuery client. The function will create its own if not provided. Note if there are any conflicts between the job_config and other parameters, the job_config values are preferred. compression_type: str Accepts `zip` or `gzip` values to differentially unzip a compressed blob in cloud storage. new_file_extension: str Provides a file extension if a blob is decompressed and rewritten to cloud storage. template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ self._validate_copy_inputs(if_exists=if_exists, data_type=data_type) job_config = self._process_job_config( job_config=job_config, destination_table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, csv_delimiter=csv_delimiter, ignoreheader=ignoreheader, nullas=nullas, allow_quoted_newlines=allow_quoted_newlines, allow_jagged_rows=allow_jagged_rows, quote=quote, custom_schema=schema, template_table=template_table, ) # TODO - See if this inheritance is happening in other places gcs = GoogleCloudStorage(app_creds=self.app_creds, project=self.project) old_bucket_name, old_blob_name = gcs.split_uri(gcs_uri=gcs_blob_uri) uncompressed_gcs_uri = None try: logger.debug("Unzipping large file") uncompressed_gcs_uri = gcs.unzip_blob( bucket_name=old_bucket_name, blob_name=old_blob_name, new_file_extension=new_file_extension, compression_type=compression_type, ) logger.debug(f"Loading uncompressed uri into BigQuery {uncompressed_gcs_uri}...") table_ref = self.get_table_ref(table_name=table_name) return self._load_table_from_uri( source_uris=uncompressed_gcs_uri, destination=table_ref, job_config=job_config, **load_kwargs, ) finally: if uncompressed_gcs_uri: new_bucket_name, new_blob_name = gcs.split_uri(gcs_uri=uncompressed_gcs_uri) gcs.delete_blob(new_bucket_name, new_blob_name) logger.debug("Successfully dropped uncompressed blob")
[docs] def copy_s3( self, table_name, bucket, key, if_exists: str = "fail", max_errors: int = 0, data_type: str = "csv", csv_delimiter: str = ",", ignoreheader: int = 1, nullas: Optional[str] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, gcs_client: Optional[GoogleCloudStorage] = None, tmp_gcs_bucket: Optional[str] = None, template_table: Optional[str] = None, job_config: Optional[LoadJobConfig] = None, **load_kwargs, ): """ Copy a file from s3 to BigQuery. `Args:` table_name: str The table name and schema (``tmc.cool_table``) to point the file. bucket: str The s3 bucket where the file or manifest is located. key: str The key of the file or manifest in the s3 bucket. if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. max_errors: int The maximum number of rows that can error and be skipped before the job fails. data_type: str The data type of the file. Only ``csv`` supported currently. csv_delimiter: str The delimiter of the ``csv``. Only relevant if data_type is ``csv``. ignoreheader: int The number of header rows to skip. Ignored if data_type is ``json``. nullas: str Loads fields that match string as NULL aws_access_key_id: An AWS access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables. aws_secret_access_key: An AWS secret access key granted to the bucket where the file is located. Not required if keys are stored as environmental variables. gcs_client: object The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage. tmp_gcs_bucket: str The name of the Google Cloud Storage bucket to use to stage the data to load into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified. template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. job_config: object A LoadJobConfig object to provide to the underlying call to load_table_from_uri on the BigQuery client. The function will create its own if not provided. Note if there are any conflicts between the job_config and other parameters, the job_config values are preferred. `Returns` Parsons Table or ``None`` See :ref:`parsons-table` for output options. """ # copy from S3 to GCS tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket) gcs_client = gcs_client or GoogleCloudStorage() temp_blob_uri = gcs_client.copy_s3_to_gcs( aws_source_bucket=bucket, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, gcs_sink_bucket=tmp_gcs_bucket, aws_s3_key=key, ) temp_blob_name = key temp_blob_uri = gcs_client.format_uri(bucket=tmp_gcs_bucket, name=temp_blob_name) # load CSV from Cloud Storage into BigQuery try: return self.copy_from_gcs( gcs_blob_uri=temp_blob_uri, table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, csv_delimiter=csv_delimiter, ignoreheader=ignoreheader, nullas=nullas, job_config=job_config, template_table=template_table, **load_kwargs, ) finally: gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
[docs] def copy( self, tbl: Table, table_name: str, if_exists: str = "fail", max_errors: int = 0, tmp_gcs_bucket: Optional[str] = None, gcs_client: Optional[GoogleCloudStorage] = None, job_config: Optional[LoadJobConfig] = None, template_table: Optional[str] = None, ignoreheader: int = 1, nullas: Optional[str] = None, allow_quoted_newlines: bool = True, allow_jagged_rows: bool = True, quote: Optional[str] = None, schema: Optional[List[dict]] = None, **load_kwargs, ): """ Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage. `Args:` tbl: obj The Parsons Table to copy into BigQuery. table_name: str The table name to load the data into. Will be used to generate load schema if no custom schema or template table are supplied and if_exists is set to "truncate" or "append". if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. max_errors: int The maximum number of rows that can error and be skipped before the job fails. tmp_gcs_bucket: str The name of the Google Cloud Storage bucket to use to stage the data to load into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified. gcs_client: object The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage. job_config: object A LoadJobConfig object to provide to the underlying call to load_table_from_uri on the BigQuery client. The function will create its own if not provided. template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. **load_kwargs: kwargs Arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ data_type = "csv" tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket) if not tmp_gcs_bucket: raise ValueError( "Must set GCS_TEMP_BUCKET environment variable or pass in tmp_gcs_bucket parameter" ) self._validate_copy_inputs(if_exists=if_exists, data_type=data_type) job_config = self._process_job_config( job_config=job_config, destination_table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, template_table=template_table, parsons_table=tbl, ignoreheader=ignoreheader, nullas=nullas, allow_quoted_newlines=allow_quoted_newlines, allow_jagged_rows=allow_jagged_rows, quote=quote, custom_schema=schema, ) # Reorder schema to match table to ensure compatibility schema = [] for column in tbl.columns: try: schema_row = [i for i in job_config.schema if i.name.lower() == column.lower()][0] except IndexError: raise IndexError(f"Column found in Table that was not found in schema: {column}") schema.append(schema_row) job_config.schema = schema gcs_client = gcs_client or GoogleCloudStorage() temp_blob_name = f"{uuid.uuid4()}.{data_type}" temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name) # load CSV from Cloud Storage into BigQuery try: self._load_table_from_uri( source_uris=temp_blob_uri, destination=self.get_table_ref(table_name=table_name), job_config=job_config, **load_kwargs, ) finally: gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
[docs] def duplicate_table( self, source_table, destination_table, if_exists="fail", drop_source_table=False, ): """ Create a copy of an existing table (or subset of rows) in a new table. `Args:` source_table: str Name of existing schema and table (e.g. ``myschema.oldtable``) destination_table: str Name of destination schema and table (e.g. ``myschema.newtable``) if_exists: str If the table already exists, either ``fail``, ``replace``, or ``ignore`` the operation. drop_source_table: boolean Drop the source table """ if if_exists not in ["fail", "replace", "ignore"]: raise ValueError("Invalid value for `if_exists` argument") if if_exists == "fail" and self.table_exists(destination_table): raise ValueError("Table already exists.") table__replace_clause = "OR REPLACE " if if_exists == "replace" else "" table__exists_clause = " IF NOT EXISTS" if if_exists == "ignore" else "" query = f""" CREATE {table__replace_clause}TABLE{table__exists_clause} {destination_table} CLONE {source_table} """ self.query(sql=query, return_values=False) if drop_source_table: self.delete_table(table_name=source_table)
[docs] def upsert( self, table_obj, target_table, primary_key, distinct_check=True, cleanup_temp_table=True, from_s3=False, **copy_args, ): """ Preform an upsert on an existing table. An upsert is a function in which rows in a table are updated and inserted at the same time. `Args:` table_obj: obj A Parsons table object target_table: str The schema and table name to upsert primary_key: str or list The primary key column(s) of the target table distinct_check: boolean Check if the primary key column is distinct. Raise error if not. cleanup_temp_table: boolean A temp table is dropped by default on cleanup. You can set to False for debugging. from_s3: boolean Instead of specifying a table_obj (set the first argument to None), set this to True and include :func:`~parsons.databases.bigquery.Bigquery.copy_s3` arguments to upsert a pre-existing s3 file into the target_table \**copy_args: kwargs See :func:`~parsons.databases.bigquery.BigQuery.copy` for options. """ # noqa: W605 if not self.table_exists(target_table): logger.info( "Target table does not exist. Copying into newly \ created target table." ) self.copy(table_obj, target_table) return None if isinstance(primary_key, str): primary_keys = [primary_key] else: primary_keys = primary_key if distinct_check: primary_keys_statement = ", ".join(primary_keys) diff = self.query( f""" select ( select count(*) from {target_table} ) - ( SELECT COUNT(*) from ( select distinct {primary_keys_statement} from {target_table} ) ) as total_count """ ).first if diff > 0: raise ValueError("Primary key column contains duplicate values.") noise = f"{random.randrange(0, 10000):04}"[:4] date_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M") # Generate a temp table like "table_tmp_20200210_1230_14212" staging_tbl = f"{target_table}_stg_{date_stamp}_{noise}" # Copy to a staging table logger.info(f"Building staging table: {staging_tbl}") if from_s3: if table_obj is not None: raise ValueError( "upsert(... from_s3=True) requires the first argument (table_obj)" " to be None. from_s3 and table_obj are mutually exclusive." ) self.copy_s3(staging_tbl, template_table=target_table, **copy_args) else: self.copy( tbl=table_obj, table_name=staging_tbl, template_table=target_table, **copy_args, ) # Delete rows comparisons = [ f"`{staging_tbl}`.{primary_key} = `{target_table}`.{primary_key}" for primary_key in primary_keys ] where_clause = " and ".join(comparisons) queries = [ f""" DELETE FROM `{target_table}` WHERE EXISTS (SELECT * FROM `{staging_tbl}` WHERE {where_clause}) """, f""" INSERT INTO `{target_table}` SELECT * FROM `{staging_tbl}` """, ] try: return self.query_with_transaction(queries=queries) finally: if cleanup_temp_table: logger.info(f"Deleting staging table: {staging_tbl}") self.query(f"DROP TABLE IF EXISTS {staging_tbl}", return_values=False)
[docs] def delete_table(self, table_name): """ Delete a BigQuery table. `Args:` table_name: str The name of the table to delete. """ table_ref = self.get_table_ref(table_name=table_name) self.client.delete_table(table_ref)
[docs] def table_exists(self, table_name: str) -> bool: """ Check whether or not the Google BigQuery table exists in the specified dataset. `Args:` table_name: str The name of the BigQuery table to check for `Returns:` bool True if the table exists in the specified dataset, false otherwise """ table_ref = self.get_table_ref(table_name=table_name) try: self.client.get_table(table_ref) except exceptions.NotFound: return False return True
[docs] def get_tables(self, schema, table_name: Optional[str] = None): """ List the tables in a schema including metadata. Args: schema: str Filter by a schema table_name: str Filter by a table name `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ logger.debug("Retrieving tables info.") sql = f"select * from {schema}.INFORMATION_SCHEMA.TABLES" if table_name: sql += f" where table_name = '{table_name}'" return self.query(sql)
[docs] def get_views(self, schema, view: Optional[str] = None): """ List views. Args: schema: str Filter by a schema view: str Filter by a table name `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ logger.debug("Retrieving views info.") sql = f""" select table_schema as schema_name, table_name as view_name, view_definition from {schema}.INFORMATION_SCHEMA.VIEWS """ if view: sql += f" where table_name = '{view}'" return self.query(sql)
[docs] def get_columns(self, schema: str, table_name: str): """ Gets the column names (and other column metadata) for a table. If you need just the column names run ``get_columns_list()``, as it is faster. `Args:` schema: str The schema name table_name: str The table name `Returns:` A dictionary mapping column name to a dictionary with extra info. The keys of the dictionary are ordered just liked the columns in the table. The extra info is a dict with format """ base_query = f""" SELECT * FROM `{schema}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{table_name}' """ logger.debug(base_query) return { row["column_name"]: { "data_type": row["data_type"], "is_nullable": row["is_nullable"], "is_updatable": row["is_updatable"], "is_partioning_column": row["is_partitioning_column"], "rounding_mode": row["rounding_mode"], } for row in self.query(base_query) }
[docs] def get_columns_list(self, schema: str, table_name: str) -> list: """ Gets the column names for a table. `Args:` schema: str The schema name table_name: str The table name `Returns:` A list of column names """ first_row = self.query(f"SELECT * FROM {schema}.{table_name} LIMIT 1;") return [x for x in first_row.columns]
[docs] def get_row_count(self, schema: str, table_name: str) -> int: """ Gets the row count for a BigQuery materialization. `Args`: schema: str The schema name table_name: str The table name `Returns:` Row count of the target table """ sql = f"SELECT COUNT(*) AS row_count FROM `{schema}.{table_name}`" result = self.query(sql=sql) return result["row_count"][0]
def get_table_ref(self, table_name): # Helper function to build a TableReference for our table parsed = parse_table_name(table_name) dataset_ref = self.client.dataset(parsed["dataset"]) return dataset_ref.table(parsed["table"]) def _get_job_config_schema( self, job_config: LoadJobConfig, destination_table_name: str, if_exists: str, parsons_table: Optional[Table] = None, custom_schema: Optional[list] = None, template_table: Optional[str] = None, ) -> Optional[List[bigquery.SchemaField]]: # if job.schema already set in job_config, do nothing if job_config.schema: return job_config.schema # if schema specified by user, convert to schema type and use that if custom_schema: return map_column_headers_to_schema_field(custom_schema) # if template_table specified by user, use that # otherwise, if loading into existing table, infer destination table as template table if not template_table and if_exists in ("append", "truncate"): template_table = destination_table_name # if template_table set, use it to set the load schema if template_table: try: bigquery_table = self.client.get_table(template_table) return bigquery_table.schema except google.api_core.exceptions.NotFound: logger.warning( f"template_table '{template_table}' not found. Unable to set schema." ) # if load is coming from a Parsons table, use that to generate schema if parsons_table: return self._generate_schema_from_parsons_table(parsons_table) return None def _generate_schema_from_parsons_table(self, tbl): """BigQuery schema generation based on contents of Parsons table. Not usually necessary to use this. BigQuery is able to natively autodetect schema formats.""" stats = tbl.get_columns_type_stats() fields = [] for stat in stats: petl_types = stat["type"] # Prefer 'str' if included # Otherwise choose first type that isn't "NoneType" # Otherwise choose NoneType not_none_petl_types = [i for i in petl_types if i != "NoneType"] if "str" in petl_types: best_type = "str" elif not_none_petl_types: best_type = not_none_petl_types[0] else: best_type = "NoneType" # Python datetimes may be datetime or timestamp in BigQuery # BigQuery datetimes have no timezone, timestamps do if best_type == "datetime": for value in petl.util.base.values(tbl.table, stat["name"]): if isinstance(value, datetime.datetime) and value.tzinfo: best_type = "timestamp" field_type = self._bigquery_type(best_type) field = bigquery.schema.SchemaField(stat["name"], field_type) fields.append(field) return fields def _process_job_config( self, destination_table_name: str, if_exists: str, max_errors: int, data_type: str, csv_delimiter: Optional[str] = ",", ignoreheader: Optional[int] = 1, nullas: Optional[str] = None, allow_quoted_newlines: Optional[bool] = None, allow_jagged_rows: Optional[bool] = None, quote: Optional[str] = None, job_config: Optional[LoadJobConfig] = None, custom_schema: Optional[list] = None, template_table: Optional[str] = None, parsons_table: Optional[Table] = None, ) -> LoadJobConfig: """ Internal function to neatly process a user-supplied job configuration object. As a convention, if both the job_config and keyword arguments specify a value, we defer to the job_config. `Args`: job_config: `LoadJobConfig` Optionally supplied GCS `LoadJobConfig` object `Returns`: A `LoadJobConfig` object """ if not job_config: job_config = bigquery.LoadJobConfig() job_config.schema = self._get_job_config_schema( job_config=job_config, destination_table_name=destination_table_name, if_exists=if_exists, parsons_table=parsons_table, custom_schema=custom_schema, template_table=template_table, ) if not job_config.schema: job_config.autodetect = True if not job_config.create_disposition: job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED if not job_config.max_bad_records: job_config.max_bad_records = max_errors if not job_config.skip_leading_rows and data_type == "csv": job_config.skip_leading_rows = ignoreheader if not job_config.source_format: job_config.source_format = ( bigquery.SourceFormat.CSV if data_type == "csv" else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON ) if not job_config.field_delimiter: if data_type == "csv": job_config.field_delimiter = csv_delimiter if nullas: job_config.null_marker = nullas destination_table_exists = self.table_exists(destination_table_name) if not job_config.write_disposition: if if_exists == "append": job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND elif if_exists == "truncate": job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE elif destination_table_exists and if_exists == "fail": raise Exception("Table already exists.") elif if_exists == "drop" and destination_table_exists: self.delete_table(destination_table_name) job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY else: job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY if not job_config.allow_quoted_newlines and allow_quoted_newlines is not None: job_config.allow_quoted_newlines = allow_quoted_newlines if data_type == "csv" and allow_jagged_rows is not None: job_config.allow_jagged_rows = allow_jagged_rows if not job_config.quote_character and quote is not None: job_config.quote_character = quote return job_config def _fetch_query_results(self, cursor) -> Table: # We will use a temp file to cache the results so that they are not all living # in memory. We'll use pickle to serialize the results to file in order to maintain # the proper data types (e.g. integer). temp_filename = create_temp_file() with open(temp_filename, "wb") as temp_file: header = [i[0] for i in cursor.description] pickle.dump(header, temp_file) while True: batch = cursor.fetchmany(QUERY_BATCH_SIZE) if len(batch) == 0: break for row in batch: row_data = list(row.values()) pickle.dump(row_data, temp_file) ptable = petl.frompickle(temp_filename) return Table(ptable) def _validate_copy_inputs(self, if_exists: str, data_type: str): if if_exists not in ["fail", "truncate", "append", "drop"]: raise ValueError( f"Unexpected value for if_exists: {if_exists}, must be one of " '"append", "drop", "truncate", or "fail"' ) if data_type not in ["csv", "json"]: raise ValueError(f"Only supports csv or json files [data_type = {data_type}]") def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwargs): try: load_job = self.client.load_table_from_uri( source_uris=source_uris, destination=destination, job_config=job_config, **load_kwargs, ) load_job.result() return load_job except exceptions.BadRequest as e: for idx, error_ in enumerate(load_job.errors): if idx == 0: logger.error("* Load job failed. Enumerating errors collection below:") logger.error(f"** Error collection - index {idx}:") logger.error(error_) raise e @staticmethod def _bigquery_type(tp): return BIGQUERY_TYPE_MAP[tp] def table(self, table_name): # Return a MySQL table object return BigQueryTable(self, table_name)
class BigQueryTable(BaseTable): """BigQuery table object.""" def drop(self, cascade=False): """ Drop the table. """ self.db.delete_table(self.table) def truncate(self): """ Truncate the table. """ self.db.query(f"TRUNCATE TABLE {self.table}")