Source code for parsons.databases.postgres.postgres

from parsons.databases.postgres.postgres_core import PostgresCore
from parsons.databases.table import BaseTable
import logging
import os


logger = logging.getLogger(__name__)


[docs]class Postgres(PostgresCore): """ A Postgres class to connect to database. Credentials can be passed from a ``.pgpass`` file stored in your home directory or with environmental variables. Args: username: str Required if env variable ``PGUSER`` not populated password: str Required if env variable ``PGPASSWORD`` not populated host: str Required if env variable ``PGHOST`` not populated db: str Required if env variable ``PGDATABASE`` not populated port: int Required if env variable ``PGPORT`` not populated. timeout: int Seconds to timeout if connection not established. """ def __init__(self, username=None, password=None, host=None, db=None, port=5432, timeout=10): self.username = username or os.environ.get('PGUSER') self.password = password or os.environ.get('PGPASSWORD') self.host = host or os.environ.get('PGHOST') self.db = db or os.environ.get('PGDATABASE') self.port = port or os.environ.get('PGPORT') # Check if there is a pgpass file. Psycopg2 will search for this file first when # creating a connection. pgpass = os.path.isfile(os.path.expanduser('~/.pgpass')) if not any([self.username, self.password, self.host, self.db]) and not pgpass: raise ValueError('Connection arguments missing. Please pass as a pgpass file, kwargs', 'or env variables.') self.timeout = timeout self.dialect = 'postgres'
[docs] def copy(self, tbl, table_name, if_exists='fail'): """ Copy a :ref:`parsons-table` to Postgres. tbl: parsons.Table A Parsons table object table_name: str The destination schema and table (e.g. ``my_schema.my_table``) if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. """ with self.connection() as connection: # Auto-generate table if self._create_table_precheck(connection, table_name, if_exists): # Create the table # To Do: Pass in the advanced configuration parameters. sql = self.create_statement(tbl, table_name) self.query_with_connection(sql, connection, commit=False) logger.info(f'{table_name} created.') sql = f"COPY {table_name} FROM STDIN CSV HEADER;" with self.cursor(connection) as cursor: cursor.copy_expert(sql, open(tbl.to_csv(), "r")) logger.info(f'{tbl.num_rows} rows copied to {table_name}.')
def table(self, table_name): # Return a Postgres table object return PostgresTable(self, table_name)
class PostgresTable(BaseTable): # Postgres table object. pass