Source code for parsons.google.google_bigquery

import pickle
import uuid

from google.cloud import bigquery
from google.cloud.bigquery import dbapi
from google.cloud import exceptions
import petl

from parsons.databases.table import BaseTable
from parsons.etl import Table
from parsons.google.utitities import setup_google_application_credentials
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file

BIGQUERY_TYPE_MAP = {
    'str': 'STRING',
    'float': 'FLOAT',
    'int': 'INTEGER',
    'bool': 'BOOLEAN',
    'datetime.datetime': 'DATETIME',
    'datetime.date': 'DATE',
    'datetime.time': 'TIME',
    'dict': 'RECORD',
}

# Max number of rows that we query at a time, so we can avoid loading huge
# data sets into memory.
# 100k rows per batch at ~1k bytes each = ~100MB per batch.
QUERY_BATCH_SIZE = 100000


def get_table_ref(client, table_name):
    # Helper function to build a TableReference for our table
    parsed = parse_table_name(table_name)
    dataset_ref = client.dataset(parsed['dataset'])
    return dataset_ref.table(parsed['table'])


def parse_table_name(table_name):
    # Helper function to parse out the different components of a table ID
    parts = table_name.split('.')
    parts.reverse()
    parsed = {
        'project': None,
        'dataset': None,
        'table': None,
    }
    if len(parts) > 0:
        parsed['table'] = parts[0]
    if len(parts) > 1:
        parsed['dataset'] = parts[1]
    if len(parts) > 2:
        parsed['project'] = parts[2]
    return parsed


[docs]class GoogleBigQuery: """ Class for querying BigQuery table and returning the data as Parsons tables. This class requires application credentials in the form of a json. It can be passed in the following ways: * Set an environmental variable named ``GOOGLE_APPLICATION_CREDENTIALS`` with the local path to the credentials json. Example: ``GOOGLE_APPLICATION_CREDENTALS='path/to/creds.json'`` * Pass in the path to the credentials using the ``app_creds`` argument. * Pass in a json string using the ``app_creds`` argument. Args: app_creds: str A credentials json string or a path to a json file. Not required if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set. project: str The project which the client is acting on behalf of. If not passed then will use the default inferred environment. location: str Default geographic location for tables """ def __init__(self, app_creds=None, project=None, location=None): self.app_creds = app_creds setup_google_application_credentials(app_creds) self.project = project self.location = location # We will not create the client until we need to use it, since creating the client # without valid GOOGLE_APPLICATION_CREDENTIALS raises an exception. # This attribute will be used to hold the client once we have created it. self._client = None self._dbapi = dbapi self.dialect = 'bigquery'
[docs] def copy(self, table_obj, table_name, if_exists='fail', tmp_gcs_bucket=None, gcs_client=None, job_config=None, **load_kwargs): """ Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage. `Args:` table_obj: obj The Parsons Table to copy into BigQuery. table_name: str The table name to load the data into. if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. tmp_gcs_bucket: str The name of the Google Cloud Storage bucket to use to stage the data to load into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified. gcs_client: object The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage. job_config: object A LoadJobConfig object to provide to the underlying call to load_table_from_uri on the BigQuery client. The function will create its own if not provided. **load_kwargs: kwargs Arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ tmp_gcs_bucket = check_env.check('GCS_TEMP_BUCKET', tmp_gcs_bucket) if if_exists not in ['fail', 'truncate', 'append', 'drop']: raise ValueError(f'Unexpected value for if_exists: {if_exists}, must be one of ' '"append", "drop", "truncate", or "fail"') table_exists = self.table_exists(table_name) if not job_config: job_config = bigquery.LoadJobConfig() if not job_config.schema: job_config.schema = self._generate_schema(table_obj) if not job_config.create_disposition: job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED job_config.skip_leading_rows = 1 job_config.source_format = bigquery.SourceFormat.CSV job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY if table_exists: if if_exists == 'fail': raise ValueError('Table already exists.') elif if_exists == 'drop': self.delete_table(table_name) elif if_exists == 'append': job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND elif if_exists == 'truncate': job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE gcs_client = gcs_client or GoogleCloudStorage() temp_blob_name = f'{uuid.uuid4()}.csv' temp_blob_uri = gcs_client.upload_table(table_obj, tmp_gcs_bucket, temp_blob_name) # load CSV from Cloud Storage into BigQuery table_ref = get_table_ref(self.client, table_name) try: load_job = self.client.load_table_from_uri( temp_blob_uri, table_ref, job_config=job_config, **load_kwargs, ) load_job.result() finally: gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
[docs] def delete_table(self, table_name): """ Delete a BigQuery table. `Args:` table_name: str The name of the table to delete. """ table_ref = get_table_ref(self.client, table_name) self.client.delete_table(table_ref)
[docs] def query(self, sql, parameters=None): """ Run a BigQuery query and return the results as a Parsons table. To include python variables in your query, it is recommended to pass them as parameters, following the BigQuery style where parameters are prefixed with `@`s. Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL injection attacks. **Parameter Examples** .. code-block:: python name = "Beatrice O'Brady" sql = 'SELECT * FROM my_table WHERE name = %s' rs.query(sql, parameters=[name]) .. code-block:: python name = "Beatrice O'Brady" sql = "SELECT * FROM my_table WHERE name = %(name)s" rs.query(sql, parameters={'name': name}) `Args:` sql: str A valid BigTable statement parameters: dict A dictionary of query parameters for BigQuery. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ # get our connection and cursor cursor = self._dbapi.connect(self.client).cursor() # Run the query cursor.execute(sql, parameters) # We will use a temp file to cache the results so that they are not all living # in memory. We'll use pickle to serialize the results to file in order to maintain # the proper data types (e.g. integer). temp_filename = create_temp_file() wrote_header = False with open(temp_filename, 'wb') as temp_file: # Track whether we got data, since if we don't get any results we need to return None got_results = False while True: batch = cursor.fetchmany(QUERY_BATCH_SIZE) if len(batch) == 0: break got_results = True for row in batch: # Make sure we write out the header once and only once if not wrote_header: wrote_header = True header = list(row.keys()) pickle.dump(header, temp_file) row_data = list(row.values()) pickle.dump(row_data, temp_file) if not got_results: return None ptable = petl.frompickle(temp_filename) final_table = Table(ptable) return final_table
[docs] def table_exists(self, table_name): """ Check whether or not the Google BigQuery table exists in the specified dataset. `Args:` table_name: str The name of the BigQuery table to check for `Returns:` bool True if the table exists in the specified dataset, false otherwise """ table_ref = get_table_ref(self.client, table_name) try: self.client.get_table(table_ref) except exceptions.NotFound: return False return True
@property def client(self): """ Get the Google BigQuery client to use for making queries. `Returns:` `google.cloud.bigquery.client.Client` """ if not self._client: # Create a BigQuery client to use to make the query self._client = bigquery.Client(project=self.project, location=self.location) return self._client def _generate_schema(self, tbl): stats = tbl.get_columns_type_stats() fields = [] for stat in stats: petl_types = stat['type'] best_type = 'str' if 'str' in petl_types else petl_types[0] field_type = self._bigquery_type(best_type) field = bigquery.schema.SchemaField(stat['name'], field_type) fields.append(field) return fields @staticmethod def _bigquery_type(tp): return BIGQUERY_TYPE_MAP[tp] def table(self, table_name): # Return a MySQL table object return BigQueryTable(self, table_name)
class BigQueryTable(BaseTable): # BigQuery table object. def drop(self, cascade=False): """ Drop the table. """ self.db.delete_table(self.table) def truncate(self): """ Truncate the table. """ # BigQuery does not support truncate natively, so we will "load" an empty dataset # with write disposition of "truncate" table_ref = get_table_ref(self.db.client, self.table) bq_table = self.db.client.get_table(table_ref) # BigQuery wants the schema when we load the data, so we will grab it from the table job_config = bigquery.LoadJobConfig() job_config.schema = bq_table.schema empty_table = Table([]) self.db.copy(empty_table, self.table, if_exists='truncate', job_config=job_config)