Source code for

import pickle
import uuid

from import bigquery
from import dbapi
from import exceptions
import petl

from parsons.databases.table import BaseTable
from parsons.etl import Table
from import setup_google_application_credentials
from import GoogleCloudStorage
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file

    'str': 'STRING',
    'float': 'FLOAT',
    'int': 'INTEGER',
    'bool': 'BOOLEAN',
    'datetime.datetime': 'DATETIME',
    '': 'DATE',
    'datetime.time': 'TIME',
    'dict': 'RECORD',

# Max number of rows that we query at a time, so we can avoid loading huge
# data sets into memory.
# 100k rows per batch at ~1k bytes each = ~100MB per batch.

def get_table_ref(client, table_name):
    # Helper function to build a TableReference for our table
    parsed = parse_table_name(table_name)
    dataset_ref = client.dataset(parsed['dataset'])
    return dataset_ref.table(parsed['table'])

def parse_table_name(table_name):
    # Helper function to parse out the different components of a table ID
    parts = table_name.split('.')
    parsed = {
        'project': None,
        'dataset': None,
        'table': None,
    if len(parts) > 0:
        parsed['table'] = parts[0]
    if len(parts) > 1:
        parsed['dataset'] = parts[1]
    if len(parts) > 2:
        parsed['project'] = parts[2]
    return parsed

[docs]class GoogleBigQuery: """ Class for querying BigQuery table and returning the data as Parsons tables. This class requires application credentials in the form of a json. It can be passed in the following ways: * Set an environmental variable named ``GOOGLE_APPLICATION_CREDENTIALS`` with the local path to the credentials json. Example: ``GOOGLE_APPLICATION_CREDENTALS='path/to/creds.json'`` * Pass in the path to the credentials using the ``app_creds`` argument. * Pass in a json string using the ``app_creds`` argument. Args: app_creds: str A credentials json string or a path to a json file. Not required if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set. project: str The project which the client is acting on behalf of. If not passed then will use the default inferred environment. location: str Default geographic location for tables """ def __init__(self, app_creds=None, project=None, location=None): self.app_creds = app_creds setup_google_application_credentials(app_creds) self.project = project self.location = location # We will not create the client until we need to use it, since creating the client # without valid GOOGLE_APPLICATION_CREDENTIALS raises an exception. # This attribute will be used to hold the client once we have created it. self._client = None self._dbapi = dbapi self.dialect = 'bigquery'
[docs] def copy(self, table_obj, table_name, if_exists='fail', tmp_gcs_bucket=None, gcs_client=None, job_config=None, **load_kwargs): """ Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage. `Args:` table_obj: obj The Parsons Table to copy into BigQuery. table_name: str The table name to load the data into. if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. tmp_gcs_bucket: str The name of the Google Cloud Storage bucket to use to stage the data to load into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified. gcs_client: object The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage. job_config: object A LoadJobConfig object to provide to the underlying call to load_table_from_uri on the BigQuery client. The function will create its own if not provided. **load_kwargs: kwargs Arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ tmp_gcs_bucket = check_env.check('GCS_TEMP_BUCKET', tmp_gcs_bucket) if if_exists not in ['fail', 'truncate', 'append', 'drop']: raise ValueError(f'Unexpected value for if_exists: {if_exists}, must be one of ' '"append", "drop", "truncate", or "fail"') table_exists = self.table_exists(table_name) if not job_config: job_config = bigquery.LoadJobConfig() if not job_config.schema: job_config.schema = self._generate_schema(table_obj) if not job_config.create_disposition: job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED job_config.skip_leading_rows = 1 job_config.source_format = bigquery.SourceFormat.CSV job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY if table_exists: if if_exists == 'fail': raise ValueError('Table already exists.') elif if_exists == 'drop': self.delete_table(table_name) elif if_exists == 'append': job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND elif if_exists == 'truncate': job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE gcs_client = gcs_client or GoogleCloudStorage() temp_blob_name = f'{uuid.uuid4()}.csv' temp_blob_uri = gcs_client.upload_table(table_obj, tmp_gcs_bucket, temp_blob_name) # load CSV from Cloud Storage into BigQuery table_ref = get_table_ref(self.client, table_name) try: load_job = self.client.load_table_from_uri( temp_blob_uri, table_ref, job_config=job_config, **load_kwargs, ) load_job.result() finally: gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
[docs] def delete_table(self, table_name): """ Delete a BigQuery table. `Args:` table_name: str The name of the table to delete. """ table_ref = get_table_ref(self.client, table_name) self.client.delete_table(table_ref)
[docs] def query(self, sql, parameters=None): """ Run a BigQuery query and return the results as a Parsons table. To include python variables in your query, it is recommended to pass them as parameters, following the BigQuery style where parameters are prefixed with `@`s. Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL injection attacks. **Parameter Examples** .. code-block:: python name = "Beatrice O'Brady" sql = 'SELECT * FROM my_table WHERE name = %s' rs.query(sql, parameters=[name]) .. code-block:: python name = "Beatrice O'Brady" sql = "SELECT * FROM my_table WHERE name = %(name)s" rs.query(sql, parameters={'name': name}) `Args:` sql: str A valid BigTable statement parameters: dict A dictionary of query parameters for BigQuery. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ # get our connection and cursor cursor = self._dbapi.connect(self.client).cursor() # Run the query cursor.execute(sql, parameters) # We will use a temp file to cache the results so that they are not all living # in memory. We'll use pickle to serialize the results to file in order to maintain # the proper data types (e.g. integer). temp_filename = create_temp_file() wrote_header = False with open(temp_filename, 'wb') as temp_file: # Track whether we got data, since if we don't get any results we need to return None got_results = False while True: batch = cursor.fetchmany(QUERY_BATCH_SIZE) if len(batch) == 0: break got_results = True for row in batch: # Make sure we write out the header once and only once if not wrote_header: wrote_header = True header = list(row.keys()) pickle.dump(header, temp_file) row_data = list(row.values()) pickle.dump(row_data, temp_file) if not got_results: return None ptable = petl.frompickle(temp_filename) final_table = Table(ptable) return final_table
[docs] def table_exists(self, table_name): """ Check whether or not the Google BigQuery table exists in the specified dataset. `Args:` table_name: str The name of the BigQuery table to check for `Returns:` bool True if the table exists in the specified dataset, false otherwise """ table_ref = get_table_ref(self.client, table_name) try: self.client.get_table(table_ref) except exceptions.NotFound: return False return True
@property def client(self): """ Get the Google BigQuery client to use for making queries. `Returns:` `` """ if not self._client: # Create a BigQuery client to use to make the query self._client = bigquery.Client(project=self.project, location=self.location) return self._client def _generate_schema(self, tbl): stats = tbl.get_columns_type_stats() fields = [] for stat in stats: petl_types = stat['type'] best_type = 'str' if 'str' in petl_types else petl_types[0] field_type = self._bigquery_type(best_type) field = bigquery.schema.SchemaField(stat['name'], field_type) fields.append(field) return fields @staticmethod def _bigquery_type(tp): return BIGQUERY_TYPE_MAP[tp] def table(self, table_name): # Return a MySQL table object return BigQueryTable(self, table_name)
class BigQueryTable(BaseTable): # BigQuery table object. def drop(self, cascade=False): """ Drop the table. """ self.db.delete_table(self.table) def truncate(self): """ Truncate the table. """ # BigQuery does not support truncate natively, so we will "load" an empty dataset # with write disposition of "truncate" table_ref = get_table_ref(self.db.client, self.table) bq_table = self.db.client.get_table(table_ref) # BigQuery wants the schema when we load the data, so we will grab it from the table job_config = bigquery.LoadJobConfig() job_config.schema = bq_table.schema empty_table = Table([]) self.db.copy(empty_table, self.table, if_exists='truncate', job_config=job_config)