import petl
import json
import io
import gzip
from typing import Optional
from parsons.utilities import files, zip_archive
[docs]
class ToFrom(object):
[docs]
def to_dataframe(self, index=None, exclude=None, columns=None, coerce_float=False):
"""
Outputs table as a Pandas Dataframe
`Args:`
index: str, list
Field of array to use as the index, alternately a specific set
of input labels to use
exclude: list
Columns or fields to exclude
columns: list
Column names to use. If the passed data do not have names
associated with them, this argument provides names for the
columns. Otherwise this argument indicates the order of the
columns in the result (any names not found in the data will
become all-NA columns)
`Returns:`
dataframe
Pandas DataFrame object
"""
return petl.todataframe(
self.table,
index=index,
exclude=exclude,
columns=columns,
coerce_float=coerce_float,
)
[docs]
def to_html(
self,
local_path=None,
encoding=None,
errors="strict",
index_header=False,
caption=None,
tr_style=None,
td_styles=None,
truncate=None,
):
"""
Outputs table to html.
.. warning::
If a file already exists at the given location, it will be
overwritten.
`Args:`
local_path: str
The path to write the html locally. If not specified, a temporary file will be
created and returned, and that file will be removed automatically when the script
is done running.
encoding: str
The encoding type for `csv.writer()
<https://docs.python.org/2/library/csv.html#csv.writer/>`_
errors: str
Raise an Error if encountered
index_header: boolean
Prepend index to column names; Defaults to False.
caption: str
A caption to include with the html table.
tr_style: str or callable
Style to be applied to the table row.
td_styles: str, dict or callable
Styles to be applied to the table cells.
truncate: int
Length of cell data.
`Returns:`
str
The path of the new file
"""
if not local_path:
local_path = files.create_temp_file(suffix=".html")
petl.tohtml(
self.table,
source=local_path,
encoding=encoding,
errors=errors,
caption=caption,
index_header=index_header,
tr_style=tr_style,
td_styles=td_styles,
truncate=truncate,
)
return local_path
[docs]
def to_csv(
self,
local_path=None,
temp_file_compression=None,
encoding=None,
errors="strict",
write_header=True,
csv_name=None,
**csvargs,
):
"""
Outputs table to a CSV. Additional key word arguments are passed to ``csv.writer()``. So,
e.g., to override the delimiter from the default CSV dialect, provide the delimiter
keyword argument.
.. warning::
If a file already exists at the given location, it will be
overwritten.
`Args:`
local_path: str
The path to write the csv locally. If it ends in ".gz" or ".zip", the file will be
compressed. If not specified, a temporary file will be created and returned,
and that file will be removed automatically when the script is done running.
temp_file_compression: str
If a temp file is requested (ie. no ``local_path`` is specified), the compression
type for that file. Currently "None", "gzip" or "zip" are supported.
If a ``local_path`` is specified, this argument is ignored.
encoding: str
The CSV encoding type for `csv.writer()
<https://docs.python.org/2/library/csv.html#csv.writer/>`_
errors: str
Raise an Error if encountered
write_header: boolean
Include header in output
csv_name: str
If ``zip`` compression (either specified or inferred), the name of csv file
within the archive.
\**csvargs: kwargs
``csv_writer`` optional arguments
`Returns:`
str
The path of the new file
""" # noqa: W605
# If a zip archive.
if files.zip_check(local_path, temp_file_compression):
return self.to_zip_csv(
archive_path=local_path,
encoding=encoding,
errors=errors,
write_header=write_header,
csv_name=csv_name,
**csvargs,
)
if not local_path:
suffix = ".csv" + files.suffix_for_compression_type(temp_file_compression)
local_path = files.create_temp_file(suffix=suffix)
# Create normal csv/.gzip
petl.tocsv(
self.table,
source=local_path,
encoding=encoding,
errors=errors,
write_header=write_header,
**csvargs,
)
return local_path
[docs]
def append_csv(self, local_path, encoding=None, errors="strict", **csvargs):
"""
Appends table to an existing CSV.
Additional additional key word arguments
are passed to ``csv.writer()``. So, e.g., to override the delimiter
from the default CSV dialect, provide the delimiter keyword argument.
`Args:`
local_path: str
The local path of an existing CSV file. If it ends in ".gz", the file will
be compressed.
encoding: str
The CSV encoding type for `csv.writer()
<https://docs.python.org/2/library/csv.html#csv.writer/>`_
errors: str
Raise an Error if encountered
\**csvargs: kwargs
``csv_writer`` optional arguments
`Returns:`
str
The path of the file
""" # noqa: W605
petl.appendcsv(self.table, source=local_path, encoding=encoding, errors=errors, **csvargs)
return local_path
[docs]
def to_zip_csv(
self,
archive_path=None,
csv_name=None,
encoding=None,
errors="strict",
write_header=True,
if_exists="replace",
**csvargs,
):
"""
Outputs table to a CSV in a zip archive. Additional key word arguments are passed to
``csv.writer()``. So, e.g., to override the delimiter from the default CSV dialect,
provide the delimiter keyword argument. Use thismethod if you would like to write
multiple csv files to the same archive.
.. warning::
If a file already exists in the archive, it will be overwritten.
`Args:`
archive_path: str
The path to zip achive. If not specified, a temporary file will be created and
returned, and that file will be removed automatically when the script is done
running.
csv_name: str
The name of the csv file to be stored in the archive. If ``None`` will use
the archive name.
encoding: str
The CSV encoding type for `csv.writer()
<https://docs.python.org/2/library/csv.html#csv.writer/>`_
errors: str
Raise an Error if encountered
write_header: boolean
Include header in output
if_exists: str
If archive already exists, one of 'replace' or 'append'
\**csvargs: kwargs
``csv_writer`` optional arguments
`Returns:`
str
The path of the archive
""" # noqa: W605
if not archive_path:
archive_path = files.create_temp_file(suffix=".zip")
cf = self.to_csv(encoding=encoding, errors=errors, write_header=write_header, **csvargs)
if not csv_name:
csv_name = files.extract_file_name(archive_path, include_suffix=False) + ".csv"
return zip_archive.create_archive(archive_path, cf, file_name=csv_name, if_exists=if_exists)
[docs]
def to_json(self, local_path=None, temp_file_compression=None, line_delimited=False):
"""
Outputs table to a JSON file
.. warning::
If a file already exists at the given location, it will be
overwritten.
`Args:`
local_path: str
The path to write the JSON locally. If it ends in ".gz", it will be
compressed first. If not specified, a temporary file will be created and returned,
and that file will be removed automatically when the script is done running.
temp_file_compression: str
If a temp file is requested (ie. no ``local_path`` is specified), the compression
type for that file. Currently "None" and "gzip" are supported.
If a ``local_path`` is specified, this argument is ignored.
line_delimited: bool
Whether the file will be line-delimited JSON (with a row on each line), or a proper
JSON file.
`Returns:`
str
The path of the new file
"""
if not local_path:
suffix = ".json" + files.suffix_for_compression_type(temp_file_compression)
local_path = files.create_temp_file(suffix=suffix)
# Note we don't use the much simpler petl.tojson(), since that method reads the whole
# table into memory before writing to file.
if files.is_gzip_path(local_path):
open_fn = gzip.open
mode = "w+t"
else:
open_fn = open
mode = "w"
with open_fn(local_path, mode) as file:
if not line_delimited:
file.write("[")
i = 0
for row in self:
if i:
if not line_delimited:
file.write(",")
file.write("\n")
i += 1
json.dump(row, file)
if not line_delimited:
file.write("]")
return local_path
[docs]
def to_dicts(self):
"""
Output table as a list of dicts.
`Returns:`
list
"""
return list(petl.dicts(self.table))
[docs]
def to_sftp_csv(
self,
remote_path,
host,
username,
password,
port=22,
encoding=None,
compression=None,
errors="strict",
write_header=True,
rsa_private_key_file=None,
**csvargs,
):
"""
Writes the table to a CSV file on a remote SFTP server
`Args:`
remote_path: str
The remote path of the file. If it ends in '.gz', the file will be compressed.
host: str
The remote host
username: str
The username to access the SFTP server
password: str
The password to access the SFTP server
port: int
The port number of the SFTP server
encoding: str
The CSV encoding type for `csv.writer()
<https://docs.python.org/2/library/csv.html#csv.writer/>`_
errors: str
Raise an Error if encountered
write_header: boolean
Include header in output
rsa_private_key_file str
Absolute path to a private RSA key used
to authenticate stfp connection
\**csvargs: kwargs
``csv_writer`` optional arguments
""" # noqa: W605
from parsons.sftp import SFTP
sftp = SFTP(host, username, password, port, rsa_private_key_file)
compression = files.compression_type_for_path(remote_path)
local_path = self.to_csv(
temp_file_compression=compression,
encoding=encoding,
errors=errors,
write_header=write_header,
**csvargs,
)
sftp.put_file(local_path, remote_path)
[docs]
def to_s3_csv(
self,
bucket,
key,
aws_access_key_id=None,
aws_secret_access_key=None,
compression=None,
encoding=None,
errors="strict",
write_header=True,
acl="bucket-owner-full-control",
public_url=False,
public_url_expires=3600,
use_env_token=True,
**csvargs,
):
"""
Writes the table to an s3 object as a CSV
`Args:`
bucket: str
The s3 bucket to upload to
key: str
The s3 key to name the file. If it ends in '.gz' or '.zip', the file will be
compressed.
aws_access_key_id: str
Required if not included as environmental variable
aws_secret_access_key: str
Required if not included as environmental variable
compression: str
The compression type for the s3 object. Currently "None", "zip" and "gzip" are
supported. If specified, will override the key suffix.
encoding: str
The CSV encoding type for `csv.writer()
<https://docs.python.org/2/library/csv.html#csv.writer/>`_
errors: str
Raise an Error if encountered
write_header: boolean
Include header in output
public_url: boolean
Create a public link to the file
public_url_expire: 3600
The time, in seconds, until the url expires if ``public_url`` set to ``True``.
acl: str
The S3 permissions on the file
use_env_token: boolean
Controls use of the ``AWS_SESSION_TOKEN`` environment variable for S3. Defaults
to ``True``. Set to ``False`` in order to ignore the ``AWS_SESSION_TOKEN`` env
variable even if the ``aws_session_token`` argument was not passed in.
\**csvargs: kwargs
``csv_writer`` optional arguments
`Returns:`
Public url if specified. If not ``None``.
""" # noqa: W605
compression = compression or files.compression_type_for_path(key)
csv_name = files.extract_file_name(key, include_suffix=False) + ".csv"
# Save the CSV as a temp file
local_path = self.to_csv(
temp_file_compression=compression,
encoding=encoding,
errors=errors,
write_header=write_header,
csv_name=csv_name,
**csvargs,
)
# Put the file on S3
from parsons.aws import S3
self.s3 = S3(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
use_env_token=use_env_token,
)
self.s3.put_file(bucket, key, local_path, acl=acl)
if public_url:
return self.s3.get_url(bucket, key, expires_in=public_url_expires)
else:
return None
[docs]
def to_gcs_csv(
self,
bucket_name,
blob_name,
gcs_client=None,
app_creds=None,
project=None,
compression=None,
encoding=None,
errors="strict",
write_header=True,
public_url=False,
public_url_expires=60,
**csvargs,
):
"""
Writes the table to a Google Cloud Storage blob as a CSV.
`Args:`
bucket_name: str
The bucket to upload to
blob_name: str
The blob to name the file. If it ends in '.gz' or '.zip', the file will be
compressed.
app_creds: str
A credentials json string or a path to a json file. Not required
if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
project: str
The project which the client is acting on behalf of. If not passed
then will use the default inferred environment.
compression: str
The compression type for the csv. Currently "None", "zip" and "gzip" are
supported. If specified, will override the key suffix.
encoding: str
The CSV encoding type for `csv.writer()
<https://docs.python.org/2/library/csv.html#csv.writer/>`_
errors: str
Raise an Error if encountered
write_header: boolean
Include header in output
public_url: boolean
Create a public link to the file
public_url_expire: 60
The time, in minutes, until the url expires if ``public_url`` set to ``True``.
\**csvargs: kwargs
``csv_writer`` optional arguments
`Returns:`
Public url if specified. If not ``None``.
""" # noqa: W605
compression = compression or files.compression_type_for_path(blob_name)
csv_name = files.extract_file_name(blob_name, include_suffix=False) + ".csv"
# Save the CSV as a temp file
local_path = self.to_csv(
temp_file_compression=compression,
encoding=encoding,
errors=errors,
write_header=write_header,
csv_name=csv_name,
**csvargs,
)
if not gcs_client:
from parsons.google.google_cloud_storage import GoogleCloudStorage
gcs_client = GoogleCloudStorage(app_creds=app_creds, project=project)
gcs_client.put_blob(bucket_name, blob_name, local_path)
if public_url:
return gcs_client.get_url(bucket_name, blob_name, expires_in=public_url_expires)
else:
return None
[docs]
def to_redshift(
self,
table_name,
username=None,
password=None,
host=None,
db=None,
port=None,
**copy_args,
):
"""
Write a table to a Redshift database. Note, this requires you to pass
AWS S3 credentials or store them as environmental variables.
Args:
table_name: str
The table name and schema (``my_schema.my_table``) to point the file.
username: str
Required if env variable ``REDSHIFT_USERNAME`` not populated
password: str
Required if env variable ``REDSHIFT_PASSWORD`` not populated
host: str
Required if env variable ``REDSHIFT_HOST`` not populated
db: str
Required if env variable ``REDSHIFT_DB`` not populated
port: int
Required if env variable ``REDSHIFT_PORT`` not populated. Port 5439 is typical.
\**copy_args: kwargs
See :func:`~parsons.databases.Redshift.copy`` for options.
Returns:
``None``
""" # noqa: W605
from parsons.databases.redshift import Redshift
rs = Redshift(username=username, password=password, host=host, db=db, port=port)
rs.copy(self, table_name, **copy_args)
[docs]
def to_postgres(
self,
table_name,
username=None,
password=None,
host=None,
db=None,
port=None,
**copy_args,
):
"""
Write a table to a Postgres database.
Args:
table_name: str
The table name and schema (``my_schema.my_table``) to point the file.
username: str
Required if env variable ``PGUSER`` not populated
password: str
Required if env variable ``PGPASSWORD`` not populated
host: str
Required if env variable ``PGHOST`` not populated
db: str
Required if env variable ``PGDATABASE`` not populated
port: int
Required if env variable ``PGPORT`` not populated.
\**copy_args: kwargs
See :func:`~parsons.databases.Postgres.copy`` for options.
Returns:
``None``
""" # noqa: W605
from parsons.databases.postgres import Postgres
pg = Postgres(username=username, password=password, host=host, db=db, port=port)
pg.copy(self, table_name, **copy_args)
[docs]
def to_bigquery(
self,
table_name: str,
app_creds: Optional[str] = None,
project: Optional[str] = None,
**kwargs,
):
"""
Write a table to BigQuery
`Args`:
table_name: str
Table name to write to in BigQuery; this should be in `schema.table` format
app_creds: str
A credentials json string or a path to a json file. Not required
if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
project: str
The project which the client is acting on behalf of. If not passed
then will use the default inferred environment.
**kwargs: kwargs
Additional keyword arguments passed into the `.copy()` function (`if_exists`,
`max_errors`, etc.)
`Returns`:
``None``
"""
from parsons import GoogleBigQuery as BigQuery
bq = BigQuery(app_creds=app_creds, project=project)
bq.copy(self, table_name=table_name, **kwargs)
def to_petl(self):
return self.table
[docs]
def to_civis(
self,
table,
api_key=None,
db=None,
max_errors=None,
existing_table_rows="fail",
diststyle=None,
distkey=None,
sortkey1=None,
sortkey2=None,
wait=True,
**civisargs,
):
"""
Write the table to a Civis Redshift cluster. Additional key word
arguments can passed to `civis.io.dataframe_to_civis()
<https://civis-python.readthedocs.io/en/v1.9.0/generated/civis.io.dataframe_to_civis.html#civis.io.dataframe_to_civis>`_ # noqa: E501
`Args`
table: str
The schema and table you want to upload to. E.g.,
'scratch.table'. Schemas or tablenames with periods must be
double quoted, e.g. 'scratch."my.table"'.
api_key: str
Your Civis API key. If not given, the CIVIS_API_KEY environment
variable will be used.
db: str or int
The Civis Database. Can be database name or ID
max_errors: int
The maximum number of rows with errors to remove from
the import before failing.
diststyle: str
The distribution style for the table. One of `'even'`, `'all'`
or `'key'`.
existing_table_rows: str
The behaviour if a table with the requested name already
exists. One of `'fail'`, `'truncate'`, `'append'` or `'drop'`.
Defaults to `'fail'`.
distkey: str
The column to use as the distkey for the table.
sortkey1: str
The column to use as the sortkey for the table.
sortkey2: str
The second column in a compound sortkey for the table.
wait: boolean
Wait for write job to complete before exiting method.
"""
from parsons.civis.civisclient import CivisClient
civis = CivisClient(db=db, api_key=api_key)
return civis.table_import(
self,
table,
max_errors=max_errors,
existing_table_rows=existing_table_rows,
diststyle=diststyle,
distkey=distkey,
sortkey1=sortkey1,
sortkey2=sortkey2,
wait=wait,
**civisargs,
)
[docs]
@classmethod
def from_csv(cls, local_path, **csvargs):
"""
Create a ``parsons table`` object from a CSV file
`Args:`
local_path: obj
A csv formatted local path, url or ftp. If this is a
file path that ends in ".gz", the file will be decompressed first.
\**csvargs: kwargs
``csv_reader`` optional arguments
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
""" # noqa: W605
remote_prefixes = ["http://", "https://", "ftp://", "s3://"]
if any(map(local_path.startswith, remote_prefixes)):
is_remote_file = True
else:
is_remote_file = False
if not is_remote_file and not files.has_data(local_path):
raise ValueError("CSV file is empty")
return cls(petl.fromcsv(local_path, **csvargs))
[docs]
@classmethod
def from_csv_string(cls, str, **csvargs):
"""
Create a ``parsons table`` object from a string representing a CSV.
`Args:`
str: str
The string object to convert to a table
**csvargs: kwargs
``csv_reader`` optional arguments
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
bytesio = io.BytesIO(str.encode("utf-8"))
memory_source = petl.io.sources.MemorySource(bytesio.read())
return cls(petl.fromcsv(memory_source, **csvargs))
[docs]
@classmethod
def from_columns(cls, cols, header=None):
"""
Create a ``parsons table`` from a list of lists organized as columns
`Args:`
cols: list
A list of lists organized as columns
header: list
List of column names. If not specified, will use dummy column names
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
return cls(petl.fromcolumns(cols, header=header))
[docs]
@classmethod
def from_json(cls, local_path, header=None, line_delimited=False):
"""
Create a ``parsons table`` from a json file
`Args:`
local_path: list
A JSON formatted local path, url or ftp. If this is a
file path that ends in ".gz", the file will be decompressed first.
header: list
List of columns to use for the destination table. If omitted, columns will
be inferred from the initial data in the file.
line_delimited: bool
Whether the file is line-delimited JSON (with a row on each line), or a proper
JSON file.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
if line_delimited:
if files.is_gzip_path(local_path):
open_fn = gzip.open
else:
open_fn = open
with open_fn(local_path, "r") as file:
rows = [json.loads(line) for line in file]
return cls(rows)
else:
return cls(petl.fromjson(local_path, header=header))
[docs]
@classmethod
def from_redshift(cls, sql, username=None, password=None, host=None, db=None, port=None):
"""
Create a ``parsons table`` from a Redshift query.
To pull an entire Redshift table, use a query like ``SELECT * FROM tablename``.
`Args:`
sql: str
A valid SQL statement
username: str
Required if env variable ``REDSHIFT_USERNAME`` not populated
password: str
Required if env variable ``REDSHIFT_PASSWORD`` not populated
host: str
Required if env variable ``REDSHIFT_HOST`` not populated
db: str
Required if env variable ``REDSHIFT_DB`` not populated
port: int
Required if env variable ``REDSHIFT_PORT`` not populated. Port 5439 is typical.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
from parsons.databases.redshift import Redshift
rs = Redshift(username=username, password=password, host=host, db=db, port=port)
return rs.query(sql)
[docs]
@classmethod
def from_postgres(cls, sql, username=None, password=None, host=None, db=None, port=None):
"""
Args:
sql: str
A valid SQL statement
username: str
Required if env variable ``PGUSER`` not populated
password: str
Required if env variable ``PGPASSWORD`` not populated
host: str
Required if env variable ``PGHOST`` not populated
db: str
Required if env variable ``PGDATABASE`` not populated
port: int
Required if env variable ``PGPORT`` not populated.
"""
from parsons.databases.postgres import Postgres
pg = Postgres(username=username, password=password, host=host, db=db, port=port)
return pg.query(sql)
[docs]
@classmethod
def from_s3_csv(
cls,
bucket,
key,
from_manifest=False,
aws_access_key_id=None,
aws_secret_access_key=None,
**csvargs,
):
"""
Create a ``parsons table`` from a key in an S3 bucket.
`Args:`
bucket: str
The S3 bucket.
key: str
The S3 key
from_manifest: bool
If True, treats `key` as a manifest file and loads all urls into a `parsons.Table`.
Defaults to False.
aws_access_key_id: str
Required if not included as environmental variable.
aws_secret_access_key: str
Required if not included as environmental variable.
\**csvargs: kwargs
``csv_reader`` optional arguments
`Returns:`
`parsons.Table` object
""" # noqa: W605
from parsons.aws import S3
s3 = S3(aws_access_key_id, aws_secret_access_key)
if from_manifest:
with open(s3.get_file(bucket, key)) as fd:
manifest = json.load(fd)
s3_keys = [x["url"] for x in manifest["entries"]]
else:
s3_keys = [f"s3://{bucket}/{key}"]
tbls = []
for key in s3_keys:
# TODO handle urls that end with '/', i.e. urls that point to "folders"
_, _, bucket_, key_ = key.split("/", 3)
file_ = s3.get_file(bucket_, key_)
if files.compression_type_for_path(key_) == "zip":
file_ = zip_archive.unzip_archive(file_)
tbls.append(petl.fromcsv(file_, **csvargs))
return cls(petl.cat(*tbls))
[docs]
@classmethod
def from_bigquery(cls, sql: str, app_creds: str = None, project: str = None):
"""
Create a ``parsons table`` from a BigQuery statement.
To pull an entire BigQuery table, use a query like ``SELECT * FROM {{ table }}``.
`Args`:
sql: str
A valid SQL statement
app_creds: str
A credentials json string or a path to a json file. Not required
if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
project: str
The project which the client is acting on behalf of. If not passed
then will use the default inferred environment.
TODO - Should users be able to pass in kwargs here? For parameters?
`Returns`:
Parsons Table
See :ref:`parsons-table` for output options.
"""
from parsons import GoogleBigQuery as BigQuery
bq = BigQuery(app_creds=app_creds, project=project)
return bq.query(sql=sql)
[docs]
@classmethod
def from_dataframe(cls, dataframe, include_index=False):
"""
Create a ``parsons table`` from a Pandas dataframe.
`Args:`
dataframe: dataframe
A valid Pandas dataframe objectt
include_index: boolean
Include index column
"""
return cls(petl.fromdataframe(dataframe, include_index=include_index))