import logging
import uuid
import gspread
from parsons.etl.table import Table
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
hexavigesimal,
)
logger = logging.getLogger(__name__)
[docs]class GoogleSheets:
"""
A connector for Google Sheets, handling data import and export.
`Args:`
google_keyfile_dict: dict
A dictionary of Google Drive API credentials, parsed from JSON provided
by the Google Developer Console. Required if env variable
``GOOGLE_DRIVE_CREDENTIALS`` is not populated.
subject: string
In order to use account impersonation, pass in the email address of the account to be
impersonated as a string.
"""
def __init__(self, google_keyfile_dict=None, subject=None):
scope = [
"https://spreadsheets.google.com/feeds",
"https://www.googleapis.com/auth/drive",
]
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(
google_keyfile_dict,
"GOOGLE_DRIVE_CREDENTIALS",
target_env_var_name=env_credentials_path,
)
credentials = load_google_application_credentials(
env_credentials_path, scopes=scope, subject=subject
)
self.gspread_client = gspread.authorize(credentials)
def _get_worksheet(self, spreadsheet_id, worksheet=0):
# Internal method to retrieve a worksheet object.
# Check if the worksheet is an integer, if so find the sheet by index
if isinstance(worksheet, int):
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(worksheet)
elif isinstance(worksheet, str):
idx = self.list_worksheets(spreadsheet_id).index(worksheet)
try:
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(idx)
except: # noqa: E722
raise ValueError(f"Couldn't find worksheet {worksheet}")
else:
raise ValueError(f"Couldn't find worksheet index or title {worksheet}")
[docs] def list_worksheets(self, spreadsheet_id):
"""
Return a list of worksheets in the spreadsheet.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
`Returns:`
list
A List of worksheets order by their index
"""
worksheets = self.gspread_client.open_by_key(spreadsheet_id).worksheets()
return [w.title for w in worksheets]
[docs] def get_worksheet_index(self, spreadsheet_id, title):
"""
Get the first sheet in a Google spreadsheet with the given title. The
title is case sensitive and the index begins with 0.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
title: str
The sheet title
`Returns:`
str
The sheet index
"""
sheets = self.gspread_client.open_by_key(spreadsheet_id).worksheets()
for index, sheet in enumerate(sheets):
if sheet.title == title:
return index
raise ValueError(f"Couldn't find sheet with title {title}")
[docs] def get_worksheet(self, spreadsheet_id, worksheet=0):
"""
Create a ``parsons table`` from a sheet in a Google spreadsheet, given the sheet index.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
worksheet: str or int
The index or the title of the worksheet. The index begins with
0.
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
worksheet = self._get_worksheet(spreadsheet_id, worksheet)
tbl = Table(worksheet.get_all_values())
logger.info(f"Retrieved worksheet with {tbl.num_rows} rows.")
return tbl
[docs] def share_spreadsheet(
self,
spreadsheet_id,
sharee,
share_type="user",
role="reader",
notify=True,
notify_message=None,
with_link=False,
):
"""
Share a spreadsheet with a user, group of users, domain and/or the public.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
sharee: str
User or group e-mail address, domain name to share the spreadsheet
with. To share publicly, set sharee value to ``None``.
share_type: str
The sharee type. Allowed values are: ``user``, ``group``, ``domain``,
``anyone``.
role: str
The primary role for this user. Allowed values are: ``owner``,
``writer``, ``reader``.
notify: boolean
Whether to send an email to the target user/domain.
email_message: str
The email to be sent if notify kwarg set to True.
with_link: boolean
Whether a link is required for this permission.
"""
spreadsheet = self.gspread_client.open_by_key(spreadsheet_id)
spreadsheet.share(
sharee,
share_type,
role,
notify=notify,
email_message=notify_message,
with_link=with_link,
)
logger.info(f"Shared spreadsheet {spreadsheet_id}.")
[docs] def get_spreadsheet_permissions(self, spreadsheet_id):
"""
List the permissioned users and groups for a spreadsheet.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
spreadsheet = self.gspread_client.open_by_key(spreadsheet_id)
tbl = Table(spreadsheet.list_permissions())
logger.info(f"Retrieved permissions for {spreadsheet_id} spreadsheet.")
return tbl
[docs] def create_spreadsheet(self, title, editor_email=None, folder_id=None):
"""
Creates a new Google spreadsheet. Optionally shares the new doc with
the given email address. Optionally creates the sheet in a specified folder.
`Args:`
title: str
The human-readable title of the new spreadsheet
editor_email: str (optional)
Email address which should be given permissions on this spreadsheet.
Tip: You may want to share this file with the service account.
folder_id: str (optional)
ID of the Google folder where the spreadsheet should be created.
Tip: Get this from the folder URL.
Anyone shared on the folder will have access to the spreadsheet.
`Returns:`
str
The spreadsheet ID
"""
spreadsheet = self.gspread_client.create(title, folder_id=folder_id)
if editor_email:
self.gspread_client.insert_permission(
spreadsheet.id,
editor_email,
perm_type="user",
role="writer",
)
logger.info(f"Created spreadsheet {spreadsheet.id}")
return spreadsheet.id
[docs] def delete_spreadsheet(self, spreadsheet_id):
"""
Deletes a Google spreadsheet.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
"""
self.gspread_client.del_spreadsheet(spreadsheet_id)
logger.info(f"Deleted spreadsheet {spreadsheet_id}")
[docs] def add_sheet(self, spreadsheet_id, title=None, rows=100, cols=25):
"""
Adds a sheet to a Google spreadsheet.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
rows: int
Number of rows
cols
Number of cols
`Returns:`
str
The sheet index
"""
spreadsheet = self.gspread_client.open_by_key(spreadsheet_id)
spreadsheet.add_worksheet(title, rows, cols)
sheet_count = len(spreadsheet.worksheets())
logger.info("Created worksheet.")
return sheet_count - 1
[docs] def append_to_sheet(
self, spreadsheet_id, table, worksheet=0, user_entered_value=False, **kwargs
):
"""
Append data from a Parsons table to a Google sheet. Note that the table's columns are
ignored, as we'll be keeping whatever header row already exists in the Google sheet.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
table: obj
Parsons table
worksheet: str or int
The index or the title of the worksheet. The index begins with
0.
user_entered_value: bool (optional)
If True, will submit cell values as entered (required for entering formulas).
Otherwise, values will be entered as strings or numbers only.
"""
if not table.num_rows:
logger.warning("No data provided to append, skipping.")
return
# This is in here to ensure backwards compatibility with previous versions of Parsons.
if "sheet_index" in kwargs:
worksheet = kwargs["sheet_index"]
logger.warning("Argument deprecated. Use worksheet instead.")
sheet = self._get_worksheet(spreadsheet_id, worksheet)
# Grab the existing data, so we can figure out where to start adding new data as a batch.
# TODO Figure out a way to do a batch append without having to read the whole sheet first.
# Maybe use gspread's low-level batch_update().
existing_table = self.get_worksheet(spreadsheet_id, worksheet)
# If the existing sheet is blank, then just overwrite the table.
if existing_table.num_rows == 0:
return self.overwrite_sheet(spreadsheet_id, table, worksheet, user_entered_value)
cells = []
for row_num, row in enumerate(table.data):
for col_num, cell in enumerate(row):
# Add 2 to allow for the header row, and for google sheets indexing starting at 1
sheet_row_num = existing_table.num_rows + row_num + 2
cells.append(gspread.Cell(sheet_row_num, col_num + 1, row[col_num]))
value_input_option = "RAW"
if user_entered_value:
value_input_option = "USER_ENTERED"
# Update the data in one batch
sheet.update_cells(cells, value_input_option=value_input_option)
logger.info(f"Appended {table.num_rows} rows to worksheet.")
[docs] def paste_data_in_sheet(
self, spreadsheet_id, table, worksheet=0, header=True, startrow=0, startcol=0
):
"""
Pastes data from a Parsons table to a Google sheet. Note that this may overwrite
presently existing data. This function is useful for adding data to a subsection
if an existing sheet that will have other existing data - contrast to
`overwrite_sheet` (which will fully replace any existing data) and `append_to_sheet`
(which sticks the data only after all other existing data).
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL).
table: obj
Parsons table
worksheet: str or int
The index or the title of the worksheet. The index begins with 0.
header: bool
Whether or not the header row gets pasted with the data.
startrow: int
Starting row position of pasted data. Counts from 0.
startcol: int
Starting column position of pasted data. Counts from 0.
"""
sheet = self._get_worksheet(spreadsheet_id, worksheet)
number_of_columns = len(table.columns)
number_of_rows = table.num_rows + 1 if header else table.num_rows
if not number_of_rows or not number_of_columns: # No data to paste
logger.warning(
f"No data available to paste, table size "
f"({number_of_rows}, {number_of_columns}). Skipping."
)
return
# gspread uses ranges like "C3:J7", so we need to convert to this format
data_range = (
hexavigesimal(startcol + 1)
+ str(startrow + 1)
+ ":"
+ hexavigesimal(startcol + number_of_columns)
+ str(startrow + number_of_rows)
)
# Unpack data. Hopefully this is small enough for memory
data = [[]] * table.num_rows
for row_num, row in enumerate(table.data):
data[row_num] = list(row)
if header:
sheet.update(data_range, [table.columns] + data)
else:
sheet.update(data_range, data)
logger.info(f"Pasted data to {data_range} in worksheet.")
[docs] def overwrite_sheet(
self, spreadsheet_id, table, worksheet=0, user_entered_value=False, **kwargs
):
"""
Replace the data in a Google sheet with a Parsons table, using the table's columns as the
first row.
`Args:`
spreadsheet_id: str
The ID of the spreadsheet (Tip: Get this from the spreadsheet URL)
table: obj
Parsons table
worksheet: str or int
The index or the title of the worksheet. The index begins with
0.
user_entered_value: bool (optional)
If True, will submit cell values as entered (required for entering formulas).
Otherwise, values will be entered as strings or numbers only.
"""
# This is in here to ensure backwards compatibility with previous versions of Parsons.
if "sheet_index" in kwargs:
worksheet = kwargs["sheet_index"]
logger.warning("Argument deprecated. Use worksheet instead.")
sheet = self._get_worksheet(spreadsheet_id, worksheet)
sheet.clear()
if not len(table.columns):
logger.warning("No data provided, worksheet is empty.")
return
value_input_option = "RAW"
if user_entered_value:
value_input_option = "USER_ENTERED"
# Add header row
sheet.append_row(table.columns, value_input_option=value_input_option)
if table.num_rows:
cells = []
for row_num, row in enumerate(table.data):
for col_num, _ in enumerate(row):
# We start at row #2 to keep room for the header row we added above
cells.append(gspread.Cell(row_num + 2, col_num + 1, row[col_num]))
# Update the data in one batch
sheet.update_cells(cells, value_input_option=value_input_option)
else:
logger.warning("No rows provided.")
logger.info("Overwrote worksheet.")
def read_sheet(self, spreadsheet_id, sheet_index=0):
# Deprecated method v0.14 of Parsons.
logger.warning("Deprecated method. Use get_worksheet() instead.")
return self.get_worksheet(spreadsheet_id, sheet_index)
def read_sheet_with_title(self, spreadsheet_id, title):
# Deprecated method v0.14 of Parsons.
logger.warning("Deprecated method. Use get_worksheet() instead.")
return self.get_worksheet(spreadsheet_id, title)
def get_sheet_index_with_title(self, spreadsheet_id, title):
# Deprecated method v0.14 of Parsons.
logger.warning("Deprecated method. Use get_worksheet_index instead.")
return self.get_worksheet_index(spreadsheet_id, title)