import logging
import petl
logger = logging.getLogger(__name__)
[docs]class ETL(object):
def __init__(self):
pass
[docs] def head(self, n=5):
"""
Return the first n rows of the table
`Args:`
n: int
The number of rows to return. Defaults to 5.
`Returns:`
`Parsons Table`
"""
self.table = petl.head(self.table, n)
return self
[docs] def tail(self, n=5):
"""
Return the last n rows of the table. Defaults to 5.
`Args:`
n: int
The number of rows to return
`Returns:`
`Parsons Table`
"""
self.table = petl.tail(self.table, n)
return self
[docs] def add_column(self, column, value=None, index=None, if_exists="fail"):
"""
Add a column to your table
`Args:`
column: str
Name of column to add
value:
A fixed or calculated value
index: int
The position of the new column in the table
if_exists: str (options: 'fail', 'replace')
If set `replace`, this function will call `fill_column`
if the column already exists, rather than raising a `ValueError`
`Returns:`
`Parsons Table` and also updates self
"""
if column in self.columns:
if if_exists == "replace":
self.fill_column(column, value)
return self
else:
raise ValueError(f"Column {column} already exists")
self.table = self.table.addfield(column, value, index)
return self
[docs] def remove_column(self, *columns):
"""
Remove a column from your table
`Args:`
\*columns: str
Column names
`Returns:`
`Parsons Table` and also updates self
""" # noqa: W605
self.table = petl.cutout(self.table, *columns)
return self
[docs] def rename_column(self, column_name, new_column_name):
"""
Rename a column
`Args:`
column_name: str
The current column name
new_column_name: str
The new column name
`Returns:`
`Parsons Table` and also updates self
"""
if new_column_name in self.columns:
raise ValueError(f"Column {new_column_name} already exists")
self.table = petl.rename(self.table, column_name, new_column_name)
return self
[docs] def rename_columns(self, column_map):
"""
Rename multiple columns
`Args:`
column_map: dict
A dictionary of columns and new names.
The key is the old name and the value is the new name.
Example dictionary:
{'old_name': 'new_name',
'old_name2': 'new_name2'}
`Returns:`
`Parsons Table` and also updates self
"""
# Check if old column name exists and new column name does not exist
for old_name, new_name in column_map.items():
if old_name not in self.table.columns():
raise KeyError(f"Column name {old_name} does not exist")
if new_name in self.table.columns():
raise ValueError(f"Column name {new_name} already exists")
# Uses the underlying petl method
self.table = petl.rename(self.table, column_map)
return self
[docs] def fill_column(self, column_name, fill_value):
"""
Fill a column in a table
`Args:`
column_name: str
The column to fill
fill_value:
A fixed or calculated value
`Returns:`
`Parsons Table` and also updates self
"""
if callable(fill_value):
self.table = petl.convert(
self.table, column_name, lambda _, r: fill_value(r), pass_row=True
)
else:
self.table = petl.update(self.table, column_name, fill_value)
return self
[docs] def fillna_column(self, column_name, fill_value):
"""
Fill None values in a column in a table
`Args:`
column_name: str
The column to fill
fill_value:
A fixed or calculated value
`Returns:`
`Parsons Table` and also updates self
"""
if callable(fill_value):
self.table = petl.convert(
self.table,
column_name,
lambda _, r: fill_value(r),
where=lambda r: r[column_name] is None,
pass_row=True,
)
else:
self.table = petl.update(
self.table,
column_name,
fill_value,
where=lambda r: r[column_name] is None,
)
return self
[docs] def move_column(self, column, index):
"""
Move a column
`Args:`
column: str
The column name to move
index:
The new index for the column
`Returns:`
`Parsons Table` and also updates existing object.
"""
self.table = petl.movefield(self.table, column, index)
return self
[docs] def convert_column(self, *column, **kwargs):
"""
Transform values under one or more fields via arbitrary functions, method
invocations or dictionary translations. This leverages the petl ``convert()``
method. Example usage can be found `here <https://petl.readthedocs.io/en/v0.24/transform.html#petl.convert>`_.
`Args:`
*column: str
A single column or multiple columns passed as a list
**kwargs: str, method or variable
The update function, method, or variable to process the update
`Returns:`
`Parsons Table` and also updates self
""" # noqa: E501,E261
self.table = petl.convert(self.table, *column, **kwargs)
return self
[docs] def get_column_max_width(self, column):
"""
Return the maximum width of the column.
`Args:`
column: str
The column name.
`Returns:`
int
"""
max_width = 0
for v in petl.values(self.table, column):
if len(str(v).encode("utf-8")) > max_width:
max_width = len(str(v).encode("utf-8"))
return max_width
[docs] def convert_columns_to_str(self):
"""
Convenience function to convert all non-string or mixed columns in a
Parsons table to string (e.g. for comparison)
`Returns:`
`Parsons Table` and also updates self
"""
# If we don't have any rows, don't bother trying to convert things
if self.num_rows == 0:
return self
cols = self.get_columns_type_stats()
def str_or_empty(x):
if x is None:
return ""
return str(x)
for col in cols:
# If there's more than one type (or no types), convert to str
# Also if there is one type and it's not str, convert to str
if len(col["type"]) != 1 or col["type"][0] != "str":
self.convert_column(col["name"], str_or_empty)
return self
[docs] def coalesce_columns(self, dest_column, source_columns, remove_source_columns=True):
"""
Coalesces values from one or more source columns into a destination column, by selecting
the first non-empty value. If the destination column doesn't exist, it will be added.
`Args:`
dest_column: str
Name of destination column
source_columns: list
List of source column names
remove_source_columns: bool
Whether to remove the source columns after the coalesce. If the destination
column is also one of the source columns, it will not be removed.
`Returns:`
`Parsons Table` and also updates self
"""
if dest_column in self.columns:
def convert_fn(value, row):
for source_col in source_columns:
if row.get(source_col):
return row[source_col]
logger.debug(f"Coalescing {source_columns} into {dest_column}")
self.convert_column(dest_column, convert_fn, pass_row=True)
else:
def add_fn(row):
for source_col in source_columns:
if row.get(source_col):
return row[source_col]
logger.debug(f"Creating new column {dest_column} from {source_columns}")
self.add_column(dest_column, add_fn)
if remove_source_columns:
for source_col in source_columns:
if source_col != dest_column:
self.remove_column(source_col)
return self
[docs] def map_columns(self, column_map, exact_match=True):
"""
Standardizes column names based on multiple possible values. This method
is helpful when your input table might have multiple and unknown column
names.
`Args:`
column_map: dict
A dictionary of columns and possible values that map to it
exact_match: boolean
If ``True`` will only map if an exact match. If ``False`` will
ignore case, spaces and underscores.
`Returns:`
`Parsons Table` and also updates self
.. code-block:: python
tbl = [{'fn': 'Jane'},
{'lastname': 'Doe'},
{'dob': '1980-01-01'}]
column_map = {'first_name': ['fn', 'first', 'firstname'],
'last_name': ['ln', 'last', 'lastname'],
'date_of_birth': ['dob', 'birthday']}
tbl.map_columns(column_map)
print (tbl)
>> {{'first_name': 'Jane', 'last_name': 'Doe', 'date_of_birth': '1908-01-01'}}
"""
for col in self.columns:
if not exact_match:
cleaned_col = col.lower().replace("_", "").replace(" ", "")
else:
cleaned_col = col
for k, v in column_map.items():
for i in v:
if cleaned_col == i:
self.rename_column(col, k)
return self
[docs] def map_and_coalesce_columns(self, column_map):
"""
Coalesces columns based on multiple possible values. The columns in the map
do not need to be in your table, so you can create a map with all possibilities.
The coalesce will occur in the order that the columns are listed, unless the
destination column name already exists in the table, in which case that
value will be preferenced. This method is helpful when your input table might
have multiple and unknown column names.
`Args:`
column_map: dict
A dictionary of columns and possible values that map to it
`Returns:`
`Parsons Table` and also updates self
.. code-block:: python
tbl = [{'first': None},
{'fn': 'Jane'},
{'lastname': 'Doe'},
{'dob': '1980-01-01'}]
column_map = {'first_name': ['fn', 'first', 'firstname'],
'last_name': ['ln', 'last', 'lastname'],
'date_of_birth': ['dob', 'birthday']}
tbl.map_and_coalesce_columns(column_map)
print (tbl)
>> {{'first_name': 'Jane', 'last_name': 'Doe', 'date_of_birth': '1908-01-01'}}
"""
for key, value in column_map.items():
coalesce_list = value
# if the column in the mapping dict isn't actually in the table,
# remove it from the list of columns to coalesce
for item in coalesce_list:
if item not in self.columns:
coalesce_list.remove(item)
# if the key from the mapping dict already exists in the table,
# rename it so it can be coalesced with other possible columns
if key in self.columns:
self.rename_column(key, f"{key}_temp")
coalesce_list.insert(0, f"{key}_temp")
# coalesce columns
self.coalesce_columns(key, coalesce_list, remove_source_columns=True)
return self
[docs] def get_column_types(self, column):
"""
Return all of the Python types for values in a given column
`Args:`
column: str
Name of the column to analyze
`Returns:`
list
A list of Python types
"""
return list(petl.typeset(self.table, column))
[docs] def get_columns_type_stats(self):
"""
Return descriptive stats for all columns
`Returns:`
list
A list of dicts
`Returns:`
list
A list of dicts, each containing a column 'name' and a 'type' list
"""
return [{"name": col, "type": self.get_column_types(col)} for col in self.table.columns()]
[docs] def convert_table(self, *args):
"""
Transform all cells in a table via arbitrary functions, method invocations or dictionary
translations. This method is useful for cleaning fields and data hygiene functions such
as regex. This method leverages the petl ``convert()`` method. Example usage can be
found `here` <https://petl.readthedocs.io/en/v0.24/transform.html#petl.convert>`_.
`Args:`
\*args: str, method or variable
The update function, method, or variable to process the update. Can also
`Returns:`
`Parsons Table` and also updates self
""" # noqa: W605
self.convert_column(self.columns, *args)
return self
[docs] def unpack_dict(
self,
column,
keys=None,
include_original=False,
sample_size=5000,
missing=None,
prepend=True,
prepend_value=None,
):
"""
Unpack dictionary values from one column into separate columns
`Args:`
column: str
The column name to unpack
keys: list
The dict keys in the column to unpack. If ``None`` will unpack
all.
include_original: boolean
Retain original column after unpacking
sample_size: int
Number of rows to sample before determining columns
missing: str
If a value is missing, the value to fill it with
prepend:
Prepend the column name of the unpacked values. Useful for
avoiding duplicate column names
prepend_value:
Value to prepend new columns if ``prepend=True``. If None, will
set to column name.
"""
if prepend:
if prepend_value is None:
prepend_value = column
self.table = petl.convert(
self.table, column, lambda v: self._prepend_dict(v, prepend_value)
)
self.table = petl.unpackdict(
self.table,
column,
keys=keys,
includeoriginal=include_original,
samplesize=sample_size,
missing=missing,
)
return self
[docs] def unpack_list(
self,
column,
include_original=False,
missing=None,
replace=False,
max_columns=None,
):
"""
Unpack list values from one column into separate columns. Numbers the
columns.
.. code-block:: python
# Begin with a list in column
json = [{'id': '5421',
'name': 'Jane Green',
'phones': ['512-699-3334', '512-222-5478']
}
]
tbl = Table(json)
print (tbl)
>>> {'id': '5421', 'name': 'Jane Green', 'phones': ['512-699-3334', '512-222-5478']}
tbl.unpack_list('phones', replace=True)
print (tbl)
>>> {'id': '5421', 'name': 'Jane Green', 'phones_0': '512-699-3334', 'phones_1': '512-222-5478'} # noqa: E501
`Args:`
column: str
The column name to unpack
include_original: boolean
Retain original column after unpacking
sample_size: int
Number of rows to sample before determining columns
missing: str
If a value is missing, the value to fill it with
replace: boolean
Return new table or replace existing
max_columns: int
The maximum number of columns to unpack
`Returns:`
None
"""
# Convert all column values to list to avoid unpack errors
self.table = petl.convert(
self.table, column, lambda v: [v] if not isinstance(v, list) else v
)
# Find the max number of values in list for all rows
col_count = 0
for row in self.cut(column):
if len(row[column]) > col_count:
col_count = len(row[column])
# If max columns provided, set max columns
if col_count > 0 and max_columns:
col_count = max_columns
# Create new column names "COL_01, COL_02"
new_cols = []
for i in range(col_count):
new_cols.append(column + "_" + str(i))
tbl = petl.unpack(
self.table,
column,
new_cols,
include_original=include_original,
missing=missing,
)
if replace:
self.table = tbl
else:
return tbl
[docs] def unpack_nested_columns_as_rows(self, column, key="id", expand_original=False):
"""
Unpack list or dict values from one column into separate rows.
Not recommended for JSON columns (i.e. lists of dicts), but can handle columns
with any mix of types. Makes use of PETL's `melt()` method.
`Args:`
column: str
The column name to unpack
key: str
The column to use as a key when unpacking. Defaults to `id`
expand_original: boolean or int
If `True`: Add resulting unpacked rows (with all other columns) to original
If `int`: Add to original unless the max added per key is above the given number
If `False` (default): Return unpacked rows (with `key` column only) as standalone
Removes packed list and dict rows from original either way.
`Returns:`
If `expand_original`, original table with packed rows replaced by unpacked rows
Otherwise, standalone table with key column and unpacked values only
"""
if isinstance(expand_original, int) and expand_original is not True:
lengths = {len(row[column]) for row in self if isinstance(row[column], (dict, list))}
max_len = sorted(lengths, reverse=True)[0]
if max_len > expand_original:
expand_original = False
if expand_original:
# Include all columns and filter out other non-dict types in table_list
table = self
table_list = table.select_rows(lambda row: isinstance(row[column], list))
else:
# Otherwise, include only key and column, but keep all non-dict types in table_list
table = self.cut(key, column)
table_list = table.select_rows(lambda row: not isinstance(row[column], dict))
# All the columns other than column to ignore while melting
ignore_cols = table.columns
ignore_cols.remove(column)
# Unpack lists as separate columns
table_list.unpack_list(column, replace=True)
# Rename the columns to retain only the number
for col in table_list.columns:
if f"{column}_" in col:
table_list.rename_column(col, col.replace(f"{column}_", ""))
# Filter dicts and unpack as separate columns
table_dict = table.select_rows(lambda row: isinstance(row[column], dict))
table_dict.unpack_dict(column, prepend=False)
from parsons.etl.table import Table
# Use melt to pivot both sets of columns into their own Tables and clean out None values
melted_list = Table(petl.melt(table_list.table, ignore_cols))
melted_dict = Table(petl.melt(table_dict.table, ignore_cols))
melted_list.remove_null_rows("value")
melted_dict.remove_null_rows("value")
melted_list.rename_column("variable", column)
melted_dict.rename_column("variable", column)
# Combine the list and dict Tables
melted_list.concat(melted_dict)
import hashlib
if expand_original:
# Add unpacked rows to the original table (minus packed rows)
orig = self.select_rows(lambda row: not isinstance(row[column], (dict, list)))
orig.concat(melted_list)
# Add unique id column by hashing all the other fields
if "uid" not in self.columns:
orig.add_column(
"uid",
lambda row: hashlib.md5(
str.encode("".join([str(x) for x in row])), usedforsecurity=False
).hexdigest(),
)
orig.move_column("uid", 0)
# Rename value column in case this is done again to this Table
orig.rename_column("value", f"{column}_value")
# Keep column next to column_value
orig.move_column(column, -1)
output = orig
else:
orig = self.remove_column(column)
# Add unique id column by hashing all the other fields
melted_list.add_column(
"uid",
lambda row: hashlib.md5(
str.encode("".join([str(x) for x in row])), usedforsecurity=False
).hexdigest(),
)
melted_list.move_column("uid", 0)
output = melted_list
self = orig
return output
[docs] def long_table(
self,
key,
column,
key_rename=None,
retain_original=False,
prepend=True,
prepend_value=None,
):
"""
Create a new long parsons table from a column, including the foreign
key.
.. code-block:: python
# Begin with nested dicts in a column
json = [{'id': '5421',
'name': 'Jane Green',
'emails': [{'home': 'jane@gmail.com'},
{'work': 'jane@mywork.com'}
]
}
]
tbl = Table(json)
print (tbl)
>>> {'id': '5421', 'name': 'Jane Green', 'emails': [{'home': 'jane@gmail.com'}, {'work': 'jane@mywork.com'}]} # noqa: E501
>>> {'id': '5421', 'name': 'Jane Green', 'emails': [{'home': 'jane@gmail.com'}, {'work': 'jane@mywork.com'}]} # noqa: E501
# Create skinny table of just the nested dicts
email_skinny = tbl.long_table(['id'], 'emails')
print (email_skinny)
>>> {'id': '5421', 'emails_home': 'jane@gmail.com', 'emails_work': None}
>>> {'id': '5421', 'emails_home': None, 'emails_work': 'jane@mywork.com'}
`Args:`
key: lst
The columns to retain in the long table (e.g. foreign keys)
column: str
The column name to make long
key_rename: dict
The new name for the foreign key to better identify it. For
example, you might want to rename ``id`` to ``person_id``.
Ex. {'KEY_NAME': 'NEW_KEY_NAME'}
retain_original: boolean
Retain the original column from the source table.
prepend:
Prepend the column name of the unpacked values. Useful for
avoiding duplicate column names
prepend_value:
Value to prepend new columns if ``prepend=True``. If None, will
set to column name.
`Returns:`
Parsons Table
The new long table
"""
if type(key) is str:
key = [key]
lt = self.cut(*key, column) # Create a table of key and column
lt.unpack_list(column, replace=True) # Unpack the list
lt.table = petl.melt(lt.table, key) # Melt into a long table
lt = lt.cut(*key, "value") # Get rid of column names created in unpack
lt.rename_column("value", column) # Rename 'value' to old column name
lt.remove_null_rows(column) # Remove null values
# If a new key name is specified, rename
if key_rename:
for k, v in key_rename.items():
lt.rename_column(k, v)
# If there is a nested dict in the column, unpack it
if lt.num_rows > 0 and isinstance(lt.table[column][0], dict):
lt.unpack_dict(column, prepend=prepend, prepend_value=prepend_value)
if not retain_original:
self.remove_column(column)
return lt
[docs] def cut(self, *columns):
"""
Return a table of selection of columns
`Args:`
\*columns: str
Columns in the parsons table
`Returns:`
A new parsons table containing the selected columnns
""" # noqa: W605
from parsons.etl.table import Table
return Table(petl.cut(self.table, *columns))
[docs] def select_rows(self, *filters):
"""
Select specific rows from a Parsons table based on the passed
filters.
Example filters:
.. code-block:: python
tbl = Table([['foo', 'bar', 'baz'],
['c', 4, 9.3],
['a', 2, 88.2],
['b', 1, 23.3],])
# You can structure the filter in multiple wayss
# Lambda Function
tbl2 = tbl.select_rows(lambda row: row.foo == 'a' and row.baz > 88.1)
tbl2
>>> {'foo': 'a', 'bar': 2, 'baz': 88.1}
# Expression String
tbl3 = tbl.select_rows("{foo} == 'a' and {baz} > 88.1")
tbl3
>>> {'foo': 'a', 'bar': 2, 'baz': 88.1}
`Args:`
\*filters: function or str
`Returns:`
A new parsons table containing the selected rows
""" # noqa: W605
from parsons.etl.table import Table
return Table(petl.select(self.table, *filters))
[docs] def remove_null_rows(self, columns, null_value=None):
"""
Remove rows if the values in a column are ``None``. If multiple columns
are passed as list, it will remove all rows with null values in any
of the passed columns.
`Args:`
column: str or list
The column or columns to analyze
null_value: int or float or str
The null value
`Returns:`
``None``
"""
if isinstance(columns, str):
columns = [columns]
for col in columns:
self.table = petl.selectisnot(self.table, col, null_value)
return self
def _prepend_dict(self, dict_obj, prepend):
# Internal method to rename dict keys
new_dict = {}
for k, v in dict_obj.items():
new_dict[prepend + "_" + k] = v
return new_dict
[docs] def stack(self, *tables, missing=None):
"""
Stack Parsons tables on top of one another.
Similar to ``table.concat()``, except no attempt is made to align fields from
different tables.
`Args:`
tables: Parsons Table or list
A single table, or a list of tables
missing: bool
The value to use when padding missing values
`Returns:`
``None``
"""
if type(tables) not in [list, tuple]:
tables = [tables]
petl_tables = [tbl.table for tbl in tables]
self.table = petl.stack(self.table, *petl_tables, missing=missing)
[docs] def concat(self, *tables, missing=None):
"""
Concatenates one or more tables onto this one.
Note that the tables do not need to share exactly the same fields.
Any missing fields will be padded with None, or whatever is provided via the
``missing`` keyword argument.
`Args:`
tables: Parsons Table or list
A single table, or a list of tables
missing: bool
The value to use when padding missing values
`Returns:`
``None``
"""
if type(tables) not in [list, tuple]:
tables = [tables]
petl_tables = [tbl.table for tbl in tables]
self.table = petl.cat(self.table, *petl_tables, missing=missing)
[docs] def chunk(self, rows):
"""
Divides a Parsons table into smaller tables of a specified row count. If the table
cannot be divided evenly, then the final table will only include the remainder.
`Args:`
rows: int
The number of rows of each new Parsons table
`Returns:`
List of Parsons tables
"""
from parsons.etl import Table
return [
Table(petl.rowslice(self.table, i, i + rows)) for i in range(0, self.num_rows, rows)
]
[docs] @staticmethod
def get_normalized_column_name(column_name):
"""
Returns a column name with whitespace removed, non-alphanumeric characters removed, and
everything lowercased.
`Returns:`
str
Normalized column name
"""
column_name = column_name.lower().strip()
return "".join(c for c in column_name if c.isalnum())
[docs] def match_columns(
self,
desired_columns,
fuzzy_match=True,
if_extra_columns="remove",
if_missing_columns="add",
):
"""
Changes the column names and ordering in this Table to match a list of desired column
names.
`Args:`
desired_columns: list
Ordered list of desired column names
fuzzy_match: bool
Whether to normalize column names when matching against the desired column names,
removing whitespace and non-alphanumeric characters, and lowercasing everything.
Eg. With this flag set, "FIRST NAME" would match "first_name".
If the Table has two columns that normalize to the same string (eg. "FIRST NAME"
and "first_name"), the latter will be considered an extra column.
if_extra_columns: string
If the Table has columns that don't match any desired columns, either 'remove'
them, 'ignore' them, or 'fail' (raising an error).
if_missing_columns: string
If the Table is missing some of the desired columns, either 'add' them (with a
value of None), 'ignore' them, or 'fail' (raising an error).
`Returns:`
`Parsons Table` and also updates self
"""
from parsons.etl import Table # Just trying to avoid recursive imports.
normalize_fn = Table.get_normalized_column_name if fuzzy_match else (lambda s: s)
# Create a mapping of our "normalized" name to the original column name
current_columns_normalized = {normalize_fn(col): col for col in reversed(self.columns)}
# Track any columns we need to add to our current table from our desired columns
columns_to_add = []
# We are going to do a "cut" later to trim our table and re-order the columns, but
# we won't have renamed our columns yet, so we need to remember their un-normalized
# form
cut_columns = []
# We are going to also rename our columns AFTER we cut, so we want to remember their
# normalized names
final_header = []
# Loop through our desired columns -- the columns we want to see in our final table
for desired_column in desired_columns:
normalized_desired = normalize_fn(desired_column)
# Try to find our desired column in our Table
if normalized_desired not in current_columns_normalized:
# If we can't find our desired column in our current columns, then it's "missing"
if if_missing_columns == "fail":
# If our missing strategy is to fail, raise an exception
raise TypeError(f"Table is missing column {desired_column}")
elif if_missing_columns == "add":
# We have to add to our table
columns_to_add.append(desired_column)
# We will need to remember this column when we cut down to desired columns
cut_columns.append(desired_column)
# This will be in the final table
final_header.append(desired_column)
elif if_missing_columns != "ignore":
# If it's not ignore, add, or fail, then it's not a valid strategy
raise TypeError(
f"Invalid option {if_missing_columns} for " "argument `if_missing_columns`"
)
else:
# We have found this in our current columns, so take it out of our list to search
current_column = current_columns_normalized.pop(normalized_desired)
# Add the column to our intermediate table as the old column name
cut_columns.append(current_column)
# Add to our final header list as the "desired" name
final_header.append(desired_column)
# Look for any "extra" columns from our current table that aren't in our desired columns
for current_column in current_columns_normalized.values():
# Figure out what to do with our "extra" columns
if if_extra_columns == "fail":
# If our missing strategy is to fail, raise an exception
raise TypeError(f"Table has extra column {current_column}")
elif if_extra_columns == "ignore":
# If we're "ignore"ing our extra columns, we should keep them by adding them to
# our intermediate and final columns list
cut_columns.append(current_column)
final_header.append(current_column)
elif if_extra_columns != "remove":
# If it's not ignore, add, or fail, then it's not a valid strategy
raise TypeError(
f"Invalid option {if_extra_columns} for " "argument `if_extra_columns`"
)
# Add any columns we need to add
for column in columns_to_add:
self.table = petl.addfield(self.table, column, None)
# Cut down to just the columns we care about
self.table = petl.cut(self.table, *cut_columns)
# Rename any columns
self.table = petl.setheader(self.table, final_header)
return self
[docs] def reduce_rows(self, columns, reduce_func, headers, presorted=False, **kwargs):
"""
Group rows by a column or columns, then reduce the groups to a single row.
Based on the `rowreduce petl function <https://petl.readthedocs.io/en/stable/transform.html#petl.transform.reductions.rowreduce>`_.
For example, the output from the query to get a table's definition is
returned as one component per row. The `reduce_rows` method can be used
to reduce all those to a single row containg the entire query.
.. code-block:: python
>>> ddl = rs.query(sql_to_get_table_ddl)
>>> ddl.table
+--------------+--------------+----------------------------------------------------+
| schemaname | tablename | ddl |
+==============+==============+====================================================+
| 'db_scratch' | 'state_fips' | '--DROP TABLE db_scratch.state_fips;' |
+--------------+--------------+----------------------------------------------------+
| 'db_scratch' | 'state_fips' | 'CREATE TABLE IF NOT EXISTS db_scratch.state_fips' |
+--------------+--------------+----------------------------------------------------+
| 'db_scratch' | 'state_fips' | '(' |
+--------------+--------------+----------------------------------------------------+
| 'db_scratch' | 'state_fips' | '\\tstate VARCHAR(1024) ENCODE RAW' |
+--------------+--------------+----------------------------------------------------+
| 'db_scratch' | 'state_fips' | '\\t,stusab VARCHAR(1024) ENCODE RAW' |
+--------------+--------------+----------------------------------------------------+
>>> reducer_fn = lambda columns, rows: [
... f"{columns[0]}.{columns[1]}",
... '\\n'.join([row[2] for row in rows])]
>>> ddl.reduce_rows(
... ['schemaname', 'tablename'],
... reducer_fn,
... ['tablename', 'ddl'],
... presorted=True)
>>> ddl.table
+-------------------------+-----------------------------------------------------------------------+
| tablename | ddl |
+=========================+=======================================================================+
| 'db_scratch.state_fips' | '--DROP TABLE db_scratch.state_fips;\\nCREATE TABLE IF NOT EXISTS |
| | db_scratch.state_fips\\n(\\n\\tstate VARCHAR(1024) ENCODE RAW\\n\\t |
| | ,db_scratch.state_fips\\n(\\n\\tstate VARCHAR(1024) ENCODE RAW |
| | \\n\\t,stusab VARCHAR(1024) ENCODE RAW\\n\\t,state_name |
| | VARCHAR(1024) ENCODE RAW\\n\\t,statens VARCHAR(1024) ENCODE |
| | RAW\\n)\\nDISTSTYLE EVEN\\n;' |
+-------------------------+-----------------------------------------------------------------------+
`Args:`
columns: list
The column(s) by which to group the rows.
reduce_func: fun
The function by which to reduce the rows. Should take the 2
arguments, the columns list and the rows list and return a list.
`reducer(columns: list, rows: list) -> list;`
headers: list
The list of headers for modified table. The length of `headers`
should match the length of the list returned by the reduce
function.
presorted: bool
If false, the row will be sorted.
`Returns:`
`Parsons Table` and also updates self
""" # noqa: E501,E261
self.table = petl.rowreduce(
self.table,
columns,
reduce_func,
header=headers,
presorted=presorted,
**kwargs,
)
return self
[docs] def sort(self, columns=None, reverse=False):
"""
Sort the rows a table.
`Args:`
sort_columns: list or str
Sort by a single column or a list of column. If ``None`` then
will sort columns from left to right.
reverse: boolean
Sort rows in reverse order.
`Returns:`
`Parsons Table` and also updates self
"""
self.table = petl.sort(self.table, key=columns, reverse=reverse)
return self
[docs] def use_petl(self, petl_method, *args, **kwargs):
"""
Call a petl function on the current table.
This convenience method exposes the petl functions to the current
Table. This is useful in cases where one might need a ``petl`` function
that has not yet been implemented for ``parsons.Table``.
.. code-block:: python
>>> # https://petl.readthedocs.io/en/v1.6.0/transform.html#petl.transform.basics.skipcomments
>>> tbl = Table([
... ['col1', 'col2'],
... ['# this is a comment row',],
... ['a', 1],
... ['#this is another comment', 'this is also ignored'],
... ['b', 2]
... ])
>>> tbl.use_petl('skipcomments', '#', update_table=True)
{'col1': 'a', 'col2': 1}
{'col1': 'b', 'col2': 2}
>>> tbl.table
+------+------+
| col1 | col2 |
+======+======+
| 'a' | 1 |
+------+------+
| 'b' | 2 |
+------+------+
`Args:`
petl_method: str
The ``petl`` function to call
update_table: bool
If ``True``, updates the ``parsons.Table``. Defaults to
``False``.
to_petl: bool
If ``True``, returns a petl table, otherwise a ``parsons.Table``.
Defaults to ``False``.
*args: Any
The arguements to pass to the petl function.
**kwargs: Any
The keyword arguements to pass to the petl function.
`Returns:`
`parsons.Table` or `petl` table
""" # noqa: E501
update_table = kwargs.pop("update_table", False)
to_petl = kwargs.pop("to_petl", False)
if update_table:
self.table = getattr(petl, petl_method)(self.table, *args, **kwargs)
if to_petl:
return getattr(petl, petl_method)(self.table, *args, **kwargs)
from parsons.etl.table import Table
return Table(getattr(petl, petl_method)(self.table, *args, **kwargs))
[docs] def deduplicate(self, keys=None, presorted=False):
"""
Deduplicates table based on an optional ``keys`` argument,
which can contain any number of keys or None.
Method considers all keys specified in the ``keys`` argument
when deduplicating, not each key individually. For example,
if ``keys=['a', 'b']``, the method will not remove a record
unless it's identical to another record in both columns ``a`` and ``b``.
.. code-block:: python
>>> tbl = Table([['a', 'b'], [1, 3], [1, 2], [1, 2], [2, 3]])
>>> tbl.table
+---+---+
| a | b |
+===+===+
| 1 | 3 |
+---+---+
| 1 | 2 |
+---+---+
| 1 | 2 |
+---+---+
| 2 | 3 |
+---+---+
>>> tbl.deduplicate('a')
>>> # removes all subsequent rows with {'a': 1}
>>> tbl.table
+---+---+
| a | b |
+===+===+
| 1 | 3 |
+---+---+
| 2 | 3 |
+---+---+
>>> tbl = Table([['a', 'b'], [1, 3], [1, 2], [1, 2], [2, 3]]) # reset
>>> tbl.deduplicate(['a', 'b'])
>>> # sorted on both ('a', 'b') so (1, 2) was placed before (1, 3)
>>> # did not remove second instance of {'a': 1} or {'b': 3}
>>> tbl.table
+---+---+
| a | b |
+===+===+
| 1 | 2 |
+---+---+
| 1 | 3 |
+---+---+
| 2 | 3 |
+---+---+
>>> tbl = Table([['a', 'b'], [1, 3], [1, 2], [1, 2], [2, 3]]) # reset
>>> tbl.deduplicate('a').deduplicate('b')
>>> # can chain method to sort/dedupe on 'a', then sort/dedupe on 'b'
>>> tbl.table
+---+---+
| a | b |
+===+===+
| 1 | 3 |
+---+---+
>>> tbl = Table([['a', 'b'], [1, 3], [1, 2], [1, 2], [2, 3]]) # reset
>>> tbl.deduplicate('b').deduplicate('a')
>>> # Order DOES matter when deduping on one column at a time
>>> tbl.table
+---+---+
| a | b |
+===+===+
| 1 | 2 |
+---+---+
`Args:`
keys: str or list[str] or None
keys to deduplicate (and optionally sort) on.
presorted: bool
If false, the row will be sorted.
`Returns`:
`Parsons Table` and also updates self
"""
deduped = petl.transform.dedup.distinct(self.table, key=keys, presorted=presorted)
self.table = deduped
return self