Source code for parsons.databases.mysql.mysql

from parsons import Table
from parsons.utilities import check_env
import petl
import mysql.connector as mysql
from contextlib import contextmanager
from parsons.utilities import files
import pickle
import logging
import os
from parsons.databases.table import BaseTable
from parsons.databases.mysql.create_table import MySQLCreateTable

# Max number of rows that we query at a time, so we can avoid loading huge
# data sets into memory.
# 100k rows per batch at ~1k bytes each = ~100MB per batch.
QUERY_BATCH_SIZE = 100000

logger = logging.getLogger(__name__)


[docs]class MySQL(MySQLCreateTable): """ Connect to a MySQL database. `Args:` username: str Required if env variable ``MYSQL_USERNAME`` not populated password: str Required if env variable ``MYSQL_PASSWORD`` not populated host: str Required if env variable ``MYSQL_HOST`` not populated db: str Required if env variable ``MYSQL_DB`` not populated port: int Can be set by env variable ``MYSQL_PORT`` or argument. """ def __init__(self, host=None, username=None, password=None, db=None, port=3306): self.username = check_env.check('MYSQL_USERNAME', username) self.password = check_env.check('MYSQL_PASSWORD', password) self.host = check_env.check('MYSQL_HOST', host) self.db = check_env.check('MYSQL_DB', db) self.port = port or os.environ.get('MYSQL_PORT')
[docs] @contextmanager def connection(self): """ Generate a MySQL connection. The connection is set up as a python "context manager", so it will be closed automatically (and all queries committed) when the connection goes out of scope. When using the connection, make sure to put it in a ``with`` block (necessary for any context manager): ``with mysql.connection() as conn:`` `Returns:` MySQL `connection` object """ # Create a mysql connection and cursor connection = mysql.connect(host=self.host, user=self.username, passwd=self.password, database=self.db, port=self.port) try: yield connection except mysql.Error: connection.rollback() raise else: connection.commit() finally: connection.close()
@contextmanager def cursor(self, connection): cur = connection.cursor(buffered=True) try: yield cur finally: cur.close()
[docs] def query(self, sql, parameters=None): """ Execute a query against the database. Will return ``None``if the query returns zero rows. To include python variables in your query, it is recommended to pass them as parameters, following the `mysql style <https://security.openstack.org/guidelines/dg_parameterize-database-queries.html>`_. Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL injection attacks. **Parameter Examples** .. code-block:: python # Note that the name contains a quote, which could break your query if not escaped # properly. name = "Beatrice O'Brady" sql = "SELECT * FROM my_table WHERE name = %s" mysql.query(sql, parameters=[name]) .. code-block:: python names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"] placeholders = ', '.join('%s' for item in names) sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})" mysql.query(sql, parameters=names) `Args:` sql: str A valid SQL statement parameters: list A list of python variables to be converted into SQL values in your query `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ # noqa: E501 with self.connection() as connection: return self.query_with_connection(sql, connection, parameters=parameters)
[docs] def query_with_connection(self, sql, connection, parameters=None, commit=True): """ Execute a query against the database, with an existing connection. Useful for batching queries together. Will return ``None`` if the query returns zero rows. `Args:` sql: str A valid SQL statement connection: obj A connection object obtained from ``mysql.connection()`` parameters: list A list of python variables to be converted into SQL values in your query commit: boolean Whether to commit the transaction immediately. If ``False`` the transaction will be committed when the connection goes out of scope and is closed (or you can commit manually with ``connection.commit()``). `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ with self.cursor(connection) as cursor: # The python connector can only execute a single sql statement, so we will # break up each statement and execute them separately. for s in sql.strip().split(';'): if len(s) != 0: logger.debug(f'SQL Query: {sql}') cursor.execute(s, parameters) if commit: connection.commit() # If the SQL query provides no response, then return None if not cursor.description: logger.debug('Query returned 0 rows') return None else: # Fetch the data in batches, and "pickle" the rows to a temp file. # (We pickle rather than writing to, say, a CSV, so that we maintain # all the type information for each field.) temp_file = files.create_temp_file() with open(temp_file, 'wb') as f: # Grab the header pickle.dump(cursor.column_names, f) while True: batch = cursor.fetchmany(QUERY_BATCH_SIZE) if len(batch) == 0: break logger.debug(f'Fetched {len(batch)} rows.') for row in batch: pickle.dump(row, f) # Load a Table from the file final_tbl = Table(petl.frompickle(temp_file)) logger.debug(f'Query returned {final_tbl.num_rows} rows.') return final_tbl
[docs] def copy(self, tbl, table_name, if_exists='fail', chunk_size=1000): """ Copy a :ref:`parsons-table` to the database. .. note:: This method utilizes extended inserts rather `LOAD DATA INFILE` since many MySQL Database configurations do not allow data files to be loaded. It results in a minor performance hit compared to `LOAD DATA`. tbl: parsons.Table A Parsons table object table_name: str The destination schema and table (e.g. ``my_schema.my_table``) if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. chunk_size: int The number of rows to insert per query. """ if tbl.num_rows == 0: logger.info('Parsons table is empty. Table will not be created.') return None with self.connection() as connection: # Create table if not exists if self._create_table_precheck(connection, table_name, if_exists): sql = self.create_statement(tbl, table_name) self.query_with_connection(sql, connection, commit=False) logger.info(f'Table {table_name} created.') # Chunk tables in batches of 1K rows, though this can be tuned and # optimized further. for t in tbl.chunk(chunk_size): sql = self._insert_statement(t, table_name) self.query_with_connection(sql, connection, commit=False)
def _insert_statement(self, tbl, table_name): """ Convert the table data into a string for bulk importing. """ # Single column tables if len(tbl.columns) == 1: values = [f"({row[0]})" for row in tbl.data] # Multi-column tables else: values = [str(row) for row in tbl.data] # Create full insert statement sql = f"""INSERT INTO {table_name} ({','.join(tbl.columns)}) VALUES {",".join(values)};""" return sql def _create_table_precheck(self, connection, table_name, if_exists): """ Helper to determine what to do when you need a table that may already exist. `Args:` connection: obj A connection object obtained from ``mysql.connection()`` table_name: str The table to check if_exists: str If the table already exists, either ``fail``, ``append``, ``drop``, or ``truncate`` the table. `Returns:` bool True if the table needs to be created, False otherwise. """ if if_exists not in ['fail', 'truncate', 'append', 'drop']: raise ValueError("Invalid value for `if_exists` argument") # If the table exists, evaluate the if_exists argument for next steps. if self.table_exists(table_name): if if_exists == 'fail': raise ValueError('Table already exists.') if if_exists == 'truncate': sql = f"TRUNCATE TABLE {table_name}" self.query_with_connection(sql, connection, commit=False) logger.info(f"{table_name} truncated.") return False if if_exists == 'drop': sql = f"DROP TABLE {table_name}" self.query_with_connection(sql, connection, commit=False) logger.info(f"{table_name} dropped.") return True else: return True
[docs] def table_exists(self, table_name): """ Check if a table or view exists in the database. `Args:` table_name: str The table name view: boolean Check to see if a view exists by the same name `Returns:` boolean ``True`` if the table exists and ``False`` if it does not. """ if self.query(f"SHOW TABLES LIKE '{table_name}'").first == table_name: return True else: return False
def table(self, table_name): # Return a MySQL table object return MySQLTable(self, table_name)
class MySQLTable(BaseTable): # MySQL table object. def get_rows(self, offset=0, chunk_size=None): """ Get rows from a table. """ sql = f"SELECT * FROM {self.table}" if chunk_size: sql += f" LIMIT {chunk_size}" if offset: sql += f" ,{offset}" return self.db.query(sql) def get_new_rows(self, primary_key, cutoff_value, offset=0, chunk_size=None): """ Get rows that have a greater primary key value than the one provided. It will select every value greater than the provided value. """ sql = f""" SELECT * FROM {self.table} WHERE {primary_key} > {cutoff_value} """ if chunk_size: sql += f" LIMIT {chunk_size}, {offset};" return self.db.query(sql)