import logging
# import pkgutil
logger = logging.getLogger(__name__)
[docs]
class RedshiftTableUtilities(object):
def __init__(self):
pass
[docs]
def table_exists(self, table_name: str, view: bool = True) -> bool:
"""
Check if a table or view exists in the database.
`Args:`
table_name: str
The table name and schema (e.g. ``myschema.mytable``).
view: boolean
Check to see if a view exists by the same name
`Returns:`
boolean
``True`` if the table exists and ``False`` if it does not.
"""
with self.connection() as connection:
return self.table_exists_with_connection(table_name, connection, view)
def table_exists_with_connection(self, table_name, connection, view=True):
table_name = table_name.lower().split(".")
table_name = [x.strip() for x in table_name]
# Check in pg tables for the table
sql = """select count(*) from pg_tables where schemaname='{}' and
tablename='{}';""".format(table_name[0], table_name[1])
# TODO maybe convert these queries to use self.query_with_connection
with self.cursor(connection) as cursor:
cursor.execute(sql)
result = cursor.fetchone()[0]
# Check in the pg_views for the table
if view:
sql = """select count(*) from pg_views where schemaname='{}' and
viewname='{}';""".format(table_name[0], table_name[1])
cursor.execute(sql)
result += cursor.fetchone()[0]
# If in either, return boolean
if result >= 1:
logger.debug(f"{table_name[0]}.{table_name[1]} exists.")
return True
else:
logger.debug(f"{table_name[0]}.{table_name[1]} does NOT exist.")
return False
[docs]
def get_row_count(self, table_name):
"""
Return the row count of a table.
**SQL Code**
.. code-block:: sql
SELECT COUNT(*) FROM myschema.mytable
`Args:`
table_name: str
The schema and name (e.g. ``myschema.mytable``) of the table.
`Returns:`
int
"""
count_query = self.query(f"select count(*) from {table_name}")
return count_query[0]["count"]
[docs]
def rename_table(self, table_name, new_table_name):
"""
Rename an existing table.
.. note::
You cannot move schemas when renaming a table. Instead, utilize
the :meth:`table_duplicate()`. method.
Args:
table_name: str
Name of existing schema and table (e.g. ``myschema.oldtable``)
new_table_name: str
New name for table with the schema omitted (e.g. ``newtable``).
"""
sql = f"alter table {table_name} rename to {new_table_name}"
self.query(sql)
logger.info(f"{table_name} renamed to {new_table_name}")
[docs]
def move_table(self, source_table, new_table, drop_source_table=False):
"""
Move an existing table in the database.It will inherit encoding, sortkey
and distkey. **Once run, the source table rows will be empty.** This is
more efficiant than running ``"create newtable as select * from oldtable"``.
For more information see: `ALTER TABLE APPEND <https://docs.aws.amazon.com/redshift/latest/dg/r_ALTER_TABLE_APPEND.html>`_
Args:
source_table: str
Name of existing schema and table (e.g. ``my_schema.old_table``)
new_table: str
New name of schema and table (e.g. ``my_schema.newtable``)
drop_original: boolean
Drop the source table.
Returns:
None
""" # noqa: E501,E261
# To Do: Add the grants
# To Do: Argument for if the table exists?
# To Do: Add the ignore extra kwarg.
create_sql = f"create table {new_table} (like {source_table});"
alter_sql = f"alter table {new_table} append from {source_table}"
logger.info(f"Creating empty {new_table} from {source_table}.")
self.query(create_sql)
with self.connection() as conn:
# An ALTER TABLE statement can't be run within a block, meaning
# that it needs to be committed on running. To enable this,
# the connection must be set to autocommit.
conn.set_session(autocommit=True)
logger.info(f"Moving data from {source_table} to {new_table}.")
self.query_with_connection(alter_sql, conn)
if drop_source_table:
self.query(f"drop table {source_table};")
logger.info(f"{source_table} dropped.")
logger.info(f"{source_table} data moved from {new_table} .")
def _create_table_precheck(self, connection, table_name, if_exists):
"""
Helper to determine what to do when you need a table that may already exist.
`Args:`
connection: obj
A connection object obtained from ``redshift.connection()``
table_name: str
The table to check
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``,
or ``truncate`` the table.
`Returns:`
bool
True if the table needs to be created, False otherwise.
"""
if if_exists not in ["fail", "truncate", "append", "drop"]:
raise ValueError("Invalid value for `if_exists` argument")
exists = self.table_exists_with_connection(table_name, connection)
if exists and if_exists in ["fail", "truncate", "append"]:
if if_exists == "fail":
raise ValueError("Table already exists.")
elif if_exists == "truncate":
truncate_sql = f"truncate table {table_name}"
self.query_with_connection(truncate_sql, connection, commit=False)
else:
if exists and if_exists == "drop":
logger.debug(f"Table {table_name} exist, will drop...")
drop_sql = f"drop table {table_name};\n"
self.query_with_connection(drop_sql, connection, commit=False)
return True
return False
[docs]
def populate_table_from_query(
self, query, destination_table, if_exists="fail", distkey=None, sortkey=None
):
"""
Populate a Redshift table with the results of a SQL query, creating the table if it
doesn't yet exist.
`Args:`
query: str
The SQL query
destination_table: str
Name of destination schema and table (e.g. ``mys_chema.new_table``)
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``,
or ``truncate`` the table.
distkey: str
The column to use as the distkey for the table.
sortkey: str
The column to use as the sortkey for the table.
"""
with self.connection() as conn:
should_create = self._create_table_precheck(conn, destination_table, if_exists)
if should_create:
logger.info(f"Creating table {destination_table} from query...")
sql = f"create table {destination_table}"
if distkey:
sql += f" distkey({distkey})"
if sortkey:
sql += f" sortkey({sortkey})"
sql += f" as {query}"
else:
logger.info(f"Inserting data into {destination_table} from query...")
sql = f"insert into {destination_table} ({query})"
self.query_with_connection(sql, conn, commit=False)
logger.info(f"{destination_table} created from query")
[docs]
def duplicate_table(
self,
source_table,
destination_table,
where_clause="",
if_exists="fail",
drop_source_table=False,
):
"""
Create a copy of an existing table (or subset of rows) in a new
table. It will inherit encoding, sortkey and distkey.
`Args:`
source_table: str
Name of existing schema and table (e.g. ``myschema.oldtable``)
destination_table: str
Name of destination schema and table (e.g. ``myschema.newtable``)
where_clause: str
An optional where clause (e.g. ``where org = 1``).
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``,
or ``truncate`` the table.
drop_source_table: boolean
Drop the source table
"""
with self.connection() as conn:
should_create = self._create_table_precheck(conn, destination_table, if_exists)
if should_create:
logger.info(f"Creating {destination_table} from {source_table}...")
create_sql = f"create table {destination_table} (like {source_table})"
self.query_with_connection(create_sql, conn, commit=False)
logger.info(f"Transferring data to {destination_table} from {source_table}")
select_sql = f"select * from {source_table} {where_clause}"
insert_sql = f"insert into {destination_table} ({select_sql})"
self.query_with_connection(insert_sql, conn, commit=False)
if drop_source_table:
logger.info(f"Dropping table {source_table}...")
drop_sql = f"drop table {source_table}"
self.query_with_connection(drop_sql, conn, commit=False)
logger.info(f"{destination_table} created from {source_table}.")
[docs]
def union_tables(self, new_table_name, tables, union_all=True, view=False):
"""
Union a series of table into a new table.
Args:
new_table_name: str
The new table and schema (e.g. ``myschema.newtable``)
tables: list
A list of tables to union
union_all: boolean
If ``False`` will deduplicate rows. If ``True`` will include
duplicate rows.
view: boolean
Create a view rather than a static table
Returns:
None
"""
union_type = " UNION ALL" if union_all else " UNION"
table_type = "VIEW" if view else "TABLE"
sql = f"CREATE {table_type} {new_table_name} AS"
for index, t in enumerate(tables):
if index != 0:
sql += union_type
sql += f" SELECT * FROM {t}"
self.query(sql)
logger.info(f"Created {new_table_name} from {', '.join(tables)}")
[docs]
def get_tables(self, schema=None, table_name=None):
"""
List the tables in a schema including metadata.
Args:
schema: str
Filter by a schema
table_name: str
Filter by a table name
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
logger.info("Retrieving tables info.")
sql = "select * from pg_tables"
if schema or table_name:
sql += " where"
if schema:
sql += f" schemaname = '{schema}'"
if table_name:
if schema:
sql += " and"
sql += f" tablename = '{table_name}'"
return self.query(sql)
[docs]
def get_table_stats(self, schema=None, table_name=None):
"""
List the tables statistics includes row count and size.
.. warning::
This method is only accessible by Redshift *superusers*.
`Args:`
schema: str
Filter by a schema
table_name: str
Filter by a table name
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
logger.info("Retrieving table statistics.")
sql = "select * from svv_table_info"
if schema or table_name:
sql += " where"
if schema:
sql += f" schema = '{schema}'"
if table_name:
if schema:
sql += " and "
sql += f" \"table\" = '{table_name}'"
return self.query(sql)
[docs]
def get_columns(self, schema, table_name):
"""
Gets the column names (and some other column info) for a table.
If you just need the column names, run ``get_columns_list()`` as it is faster.
.. code-block:: python
for col in rs.get_columns('some_schema', 'some_table'):
print(col)
`Args:`
schema: str
The schema name
table_name: str
The table name
`Returns:`
A dict mapping column name to a dict with extra info. The keys of the dict are ordered
just like the columns in the table. The extra info is a dict with format
.. code-block:: python
{
'data_type': str,
'max_length': int or None,
'max_precision': int or None,
'max_scale': int or None,
'is_nullable': bool
}
"""
query = f"""
select ordinal_position,
column_name,
data_type,
character_maximum_length as max_length,
numeric_precision as max_precision,
numeric_scale as max_scale,
is_nullable
from information_schema.columns
where table_name = '{table_name}'
and table_schema = '{schema}'
order by ordinal_position
"""
return {
row["column_name"]: {
"data_type": row["data_type"],
"max_length": row["max_length"],
"max_precision": row["max_precision"],
"max_scale": row["max_scale"],
"is_nullable": row["is_nullable"] == "YES",
}
for row in self.query(query)
}
[docs]
def get_columns_list(self, schema, table_name):
"""
Gets the just the column names for a table.
`Args:`
schema: str
The schema name
table_name: str
The table name
`Returns:`
A list of column names.
"""
schema = f'"{schema}"' if not (schema.startswith('"') and schema.endswith('"')) else schema
table_name = (
f'"{table_name}"'
if not (table_name.startswith('"') and table_name.endswith('"'))
else table_name
)
first_row = self.query(f"select * from {schema}.{table_name} limit 1")
return first_row.columns
[docs]
def get_views(self, schema=None, view=None):
"""
List views.
Args:
schema: str
Filter by a schema
view: str
Filter by a table name
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
logger.info("Retrieving views info.")
sql = """
select table_schema as schema_name,
table_name as view_name,
view_definition
from information_schema.views
where table_schema not in ('information_schema', 'pg_catalog')
"""
if schema:
sql += f" and table_schema = '{schema}'"
if view:
sql += f" and table_name = '{view}'"
return self.query(sql)
[docs]
def get_queries(self):
"""
Return the Current queries running and queueing, along with resource consumption.
.. warning::
Must be a Redshift superuser to run this method.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
logger.info("Retrieving running and queued queries.")
# Lifted from Redshift Utils https://github.com/awslabs/amazon-redshift-utils/blob/master/src/AdminScripts/running_queues.sql # noqa: E501
sql = """
select trim(u.usename) as user,
s.pid,
q.xid,
q.query,
q.service_class as service_class,
q.slot_count as slot,
date_trunc('second',
q.wlm_start_time) as start,
decode(trim(q.state),
'Running',
'Run',
'QueuedWaiting',
'Queue',
'Returning',
'Return',trim(q.state)) as state,
q.queue_Time/1000000 as queue_sec,
q.exec_time/1000000 as exec_sec,
m.cpu_time/1000000 cpu_sec,
m.blocks_read read_mb,
decode(m.blocks_to_disk,-1,null,m.blocks_to_disk) spill_mb,
m2.rows as return_rows,
m3.rows as NL_rows,
substring(replace(nvl(qrytext_cur.text,trim(translate(s.text,chr(10)||chr(13)||chr(9) ,''))),'\\n',' '),1,90) as sql, -- # noqa: E501
trim(decode(event&1,1,'SK ','') || decode(event&2,2,'Del ','') || decode(event&4,4,'NL ','') || decode(event&8,8,'Dist ','') || decode(event&16,16,'Bcast ','') || decode(event&32,32,'Stats ','')) as Alert -- # noqa: E501
from stv_wlm_query_state q
left outer join stl_querytext s on (s.query=q.query and sequence = 0)
left outer join stv_query_metrics m on ( q.query = m.query and m.segment=-1 and m.step=-1 )
left outer join stv_query_metrics m2 on ( q.query = m2.query and m2.step_type = 38 )
left outer join ( select query, sum(rows) as rows from stv_query_metrics m3 where step_type = 15 group by 1) as m3 on ( q.query = m3.query ) -- # noqa: E501
left outer join pg_user u on ( s.userid = u.usesysid )
LEFT OUTER JOIN (SELECT ut.xid,'CURSOR ' || TRIM( substring ( TEXT from strpos(upper(TEXT),'SELECT') )) as TEXT
FROM stl_utilitytext ut
WHERE sequence = 0
AND upper(TEXT) like 'DECLARE%'
GROUP BY text, ut.xid) qrytext_cur ON (q.xid = qrytext_cur.xid)
left outer join ( select query,sum(decode(trim(split_part(event,':',1)),'Very selective query filter',1,'Scanned a large number of deleted rows',2,'Nested Loop Join in the query plan',4,'Distributed a large number of rows across the network',8,'Broadcasted a large number of rows across the network',16,'Missing query planner statistics',32,0)) as event from STL_ALERT_EVENT_LOG -- # noqa: E501
where event_time >= dateadd(hour, -8, current_Date) group by query ) as alrt on alrt.query = q.query -- # noqa: E501
"""
return self.query(sql)
[docs]
def get_max_value(self, table_name, value_column):
"""
Return the max value from a table.
`Args:`
table_name: str
Schema and table name
value_column: str
The column containing the values
"""
return self.query(f"SELECT MAX({value_column}) value from {table_name}")[0]["value"]
[docs]
def get_object_type(self, object_name):
"""
Get object type.
One of `view`, `table`, `index`, `sequence`, or `TOAST table`.
`Args:`
object_name: str
The schema.obj for which to get the object type.
`Returns:`
`str` of the object type.
"""
sql_obj_type = f"""
select n.nspname||'.'||c.relname as objname
, case
when relkind='v' then 'view'
when relkind='r' then 'table'
when relkind='i' then 'index'
when relkind='s' then 'sequence'
when relkind='t' then 'TOAST table'
end as object_name
from pg_catalog.pg_class as c
inner join pg_catalog.pg_namespace as n
on c.relnamespace = n.oid
where objname='{object_name}'
"""
tbl = self.query(sql_obj_type)
if tbl.num_rows == 0:
logger.info(f"{object_name} doesn't exist.")
return None
return tbl[0]["object_name"]
[docs]
def is_view(self, object_name):
"""
Return true if the object is a view.
`Args:`
object_name: str
The schema.obj to test if it's a view.
`Returns:`
`bool`
"""
is_view = self.get_object_type(object_name) == "view"
logger.info(f"{object_name} is {'' if is_view else 'not'} a view.")
return is_view
[docs]
def is_table(self, object_name):
"""
Return true if the object is a table.
`Args:`
object_name: str
The schema.obj to test if it's a table.
`Returns:`
`bool`
"""
is_table = self.get_object_type(object_name) == "table"
logger.info(f"{object_name} is {'' if is_table else 'not'} a table.")
return is_table
[docs]
def get_table_definition(self, table):
"""
Get the table definition (i.e. the create statement).
`Args:`
table: str
The schema.table for which to get the table definition.
`Returns:`
str
"""
schema, table = self.split_full_table_name(table)
if not self.is_table(f"{schema}.{table}"):
return None
results = self.get_table_definitions(schema, table)
return results[0]["ddl"]
[docs]
def get_table_definitions(self, schema=None, table=None):
"""
Get the table definition (i.e. the create statement) for multiple tables.
This works similar to `get_table_def` except it runs a single query
to get the ddl for multiple tables. It supports SQL wildcards for
`schema` and `table`. Only returns the ddl for _tables_ that match
`schema` and `table` if they exist.
`Args:`
schema: str
The schema to filter by.
table: str
The table to filter by.
`Returns:`
`list` of dicts with matching tables.
"""
conditions = []
if schema:
conditions.append(f"schemaname like '{schema}'")
if table:
conditions.append(f"tablename like '{table}'")
conditions_str = " and ".join(conditions)
where_clause = f"where {conditions_str}" if conditions_str else ""
# ddl_query = pkgutil.get_data(
# __name__, "queries/v_generate_tbl_ddl.sql").decode()
sql_get_ddl = f"""
select *
from admin.v_generate_tbl_ddl
{where_clause}
"""
ddl_table = self.query(sql_get_ddl)
if ddl_table.num_rows == 0:
logger.info(f"No tables matching {schema} and {table}.")
return None
def join_sql_parts(columns, rows):
return [f"{columns[1]}.{columns[2]}", "\n".join([row[4] for row in rows])]
# The query returns the sql over multiple rows
# We need to join then into a single row
ddl_table.reduce_rows(
["table_id", "schemaname", "tablename"],
join_sql_parts,
["tablename", "ddl"],
presorted=True,
)
return ddl_table.to_dicts()
[docs]
def get_view_definition(self, view):
"""
Get the view definition (i.e. the create statement).
`Args:`
view: str
The schema.view for which to get the view definition.
`Returns:`
str
"""
schema, view = self.split_full_table_name(view)
if not self.is_view(f"{schema}.{view}"):
return None
results = self.get_view_definitions(schema, view)
return results[0]["ddl"]
[docs]
def get_view_definitions(self, schema=None, view=None):
"""
Get the view definition (i.e. the create statement) for multiple views.
This works similar to `get_view_def` except it runs a single query
to get the ddl for multiple views. It supports SQL wildcards for
`schema` and `view`. Only returns the ddl for _views_ that match
`schema` and `view` if they exist.
`Args:`
schema: str
The schema to filter by.
view: str
The view to filter by.
`Returns:`
`list` of dicts with matching views.
"""
conditions = []
if schema:
conditions.append(f"schemaname like '{schema}'")
if view:
conditions.append(f"g.viewname like '{view}'")
conditions_str = " and ".join(conditions)
where_clause = f"where {conditions_str}" if conditions_str else ""
# ddl_query = pkgutil.get_data(
# __name__, "queries/v_generate_view_ddl.sql").decode()
sql_get_ddl = f"""
select schemaname || '.' || viewname as viewname, ddl
from admin.v_generate_view_ddl g
{where_clause}
"""
ddl_view = self.query(sql_get_ddl)
if ddl_view.num_rows == 0:
logger.info(f"No views matching {schema} and {view}.")
return None
return ddl_view.to_dicts()
[docs]
@staticmethod
def split_full_table_name(full_table_name):
"""
Split a full table name into its schema and table. If a schema isn't
present, return `public` for the schema. Similarly, Redshift defaults
to the `public` schema, when one isn't provided.
Eg:
``(schema, table) = Redshift.split_full_table_name("some_schema.some_table")``
`Args:`
full_table_name: str
The table name, as "schema.table"
`Returns:`
tuple
A tuple containing (schema, table)
"""
if "." not in full_table_name:
return "public", full_table_name
try:
schema, table = full_table_name.split(".")
except ValueError as e:
if "too many values to unpack" in str(e):
raise ValueError(f"Invalid Redshift table {full_table_name}")
return schema, table
[docs]
@staticmethod
def combine_schema_and_table_name(schema, table):
"""
Creates a full table name by combining a schema and table.
`Args:`
schema: str
The schema name
table: str
The table name
`Returns:`
str
The combined full table name
"""
return f"{schema}.{table}"