Source code for parsons.databases.mysql.mysql

from parsons import Table
from parsons.utilities import check_env
import petl
import mysql.connector as mysql
from contextlib import contextmanager
from parsons.utilities import files
import pickle
import logging
import os
from parsons.databases.database_connector import DatabaseConnector
from parsons.databases.table import BaseTable
from parsons.databases.mysql.create_table import MySQLCreateTable
from parsons.databases.alchemy import Alchemy

# Max number of rows that we query at a time, so we can avoid loading huge
# data sets into memory.
# 100k rows per batch at ~1k bytes each = ~100MB per batch.
QUERY_BATCH_SIZE = 100000

logger = logging.getLogger(__name__)


[docs]class MySQL(DatabaseConnector, MySQLCreateTable, Alchemy): """ Connect to a MySQL database. `Args:` username: str Required if env variable ``MYSQL_USERNAME`` not populated password: str Required if env variable ``MYSQL_PASSWORD`` not populated host: str Required if env variable ``MYSQL_HOST`` not populated db: str Required if env variable ``MYSQL_DB`` not populated port: int Can be set by env variable ``MYSQL_PORT`` or argument. """ def __init__(self, host=None, username=None, password=None, db=None, port=3306): super().__init__() self.username = check_env.check("MYSQL_USERNAME", username) self.password = check_env.check("MYSQL_PASSWORD", password) self.host = check_env.check("MYSQL_HOST", host) self.db = check_env.check("MYSQL_DB", db) self.port = port or os.environ.get("MYSQL_PORT")
[docs] @contextmanager def connection(self): """ Generate a MySQL connection. The connection is set up as a python "context manager", so it will be closed automatically (and all queries committed) when the connection goes out of scope. When using the connection, make sure to put it in a ``with`` block (necessary for any context manager): ``with mysql.connection() as conn:`` `Returns:` MySQL `connection` object """ # Create a mysql connection and cursor connection = mysql.connect( host=self.host, user=self.username, passwd=self.password, database=self.db, port=self.port, ) try: yield connection except mysql.Error: connection.rollback() raise else: connection.commit() finally: connection.close()
@contextmanager def cursor(self, connection): cur = connection.cursor(buffered=True) try: yield cur finally: cur.close()
[docs] def query(self, sql, parameters=None): """ Execute a query against the database. Will return ``None`` if the query returns zero rows. To include python variables in your query, it is recommended to pass them as parameters, following the `mysql style <https://security.openstack.org/guidelines/dg_parameterize-database-queries.html>`_. Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL injection attacks. **Parameter Examples** .. code-block:: python # Note that the name contains a quote, which could break your query if not escaped # properly. name = "Beatrice O'Brady" sql = "SELECT * FROM my_table WHERE name = %s" mysql.query(sql, parameters=[name]) .. code-block:: python names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"] placeholders = ', '.join('%s' for item in names) sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})" mysql.query(sql, parameters=names) `Args:` sql: str A valid SQL statement parameters: list A list of python variables to be converted into SQL values in your query `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ # noqa: E501 with self.connection() as connection: return self.query_with_connection(sql, connection, parameters=parameters)
[docs] def query_with_connection(self, sql, connection, parameters=None, commit=True): """ Execute a query against the database, with an existing connection. Useful for batching queries together. Will return ``None`` if the query returns zero rows. `Args:` sql: str A valid SQL statement connection: obj A connection object obtained from ``mysql.connection()`` parameters: list A list of python variables to be converted into SQL values in your query commit: boolean Whether to commit the transaction immediately. If ``False`` the transaction will be committed when the connection goes out of scope and is closed (or you can commit manually with ``connection.commit()``). `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ with self.cursor(connection) as cursor: # The python connector can only execute a single sql statement, so we will # break up each statement and execute them separately. for s in sql.strip().split(";"): if len(s) != 0: logger.debug(f"SQL Query: {sql}") cursor.execute(s, parameters) if commit: connection.commit() # If the SQL query provides no response, then return None if not cursor.description: logger.debug("Query returned 0 rows") return None else: # Fetch the data in batches, and "pickle" the rows to a temp file. # (We pickle rather than writing to, say, a CSV, so that we maintain # all the type information for each field.) temp_file = files.create_temp_file() with open(temp_file, "wb") as f: # Grab the header pickle.dump(cursor.column_names, f) while True: batch = cursor.fetchmany(QUERY_BATCH_SIZE) if len(batch) == 0: break logger.debug(f"Fetched {len(batch)} rows.") for row in batch: pickle.dump(row, f) # Load a Table from the file final_tbl = Table(petl.frompickle(temp_file)) logger.debug(f"Query returned {final_tbl.num_rows} rows.") return final_tbl
[docs] def copy( self, tbl: Table, table_name: str, if_exists: str = "fail", chunk_size: int = 1000, strict_length: bool = True, ): """ Copy a :ref:`parsons-table` to the database. .. note:: This method utilizes extended inserts rather `LOAD DATA INFILE` since many MySQL Database configurations do not allow data files to be loaded. It results in a minor performance hit compared to `LOAD DATA`. `Args:` tbl: parsons.Table A Parsons table object table_name: str The destination schema and table (e.g. ``my_schema.my_table``) if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. chunk_size: int The number of rows to insert per query. strict_length: bool If the database table needs to be created, strict_length determines whether the created table's column sizes will be sized to exactly fit the current data, or if their size will be rounded up to account for future values being larger then the current dataset. defaults to ``True`` """ if tbl.num_rows == 0: logger.info("Parsons table is empty. Table will not be created.") return None with self.connection() as connection: # Create table if not exists if self._create_table_precheck(connection, table_name, if_exists): sql = self.create_statement(tbl, table_name, strict_length=strict_length) self.query_with_connection(sql, connection, commit=False) logger.info(f"Table {table_name} created.") # Chunk tables in batches of 1K rows, though this can be tuned and # optimized further. for t in tbl.chunk(chunk_size): sql = self._insert_statement(t, table_name) self.query_with_connection(sql, connection, commit=False)
def _insert_statement(self, tbl, table_name): """ Convert the table data into a string for bulk importing. """ # Single column tables if len(tbl.columns) == 1: values = [f"({row[0]})" for row in tbl.data] # Multi-column tables else: values = [str(row) for row in tbl.data] # Create full insert statement sql = f"""INSERT INTO {table_name} ({','.join(tbl.columns)}) VALUES {",".join(values)};""" return sql def _create_table_precheck(self, connection, table_name, if_exists): """ Helper to determine what to do when you need a table that may already exist. `Args:` connection: obj A connection object obtained from ``mysql.connection()`` table_name: str The table to check if_exists: str If the table already exists, either ``fail``, ``append``, ``drop``, or ``truncate`` the table. `Returns:` bool True if the table needs to be created, False otherwise. """ if if_exists not in ["fail", "truncate", "append", "drop"]: raise ValueError("Invalid value for `if_exists` argument") # If the table exists, evaluate the if_exists argument for next steps. if self.table_exists(table_name): if if_exists == "fail": raise ValueError("Table already exists.") if if_exists == "truncate": sql = f"TRUNCATE TABLE {table_name}" self.query_with_connection(sql, connection, commit=False) logger.info(f"{table_name} truncated.") return False if if_exists == "drop": sql = f"DROP TABLE {table_name}" self.query_with_connection(sql, connection, commit=False) logger.info(f"{table_name} dropped.") return True else: return True
[docs] def table_exists(self, table_name: str) -> bool: """ Check if a table or view exists in the database. `Args:` table_name: str The table name `Returns:` boolean ``True`` if the table exists and ``False`` if it does not. """ if self.query(f"SHOW TABLES LIKE '{table_name}'").first == table_name: return True else: return False
def table(self, table_name): # Return a BaseTable table object return BaseTable(self, table_name)