import logging
import tempfile
import uuid
from pathlib import Path
from typing import Literal
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
logger = logging.getLogger(__name__)
[docs]
class GoogleDrive:
"""
A connector for Google Drive
Args:
app_creds: dict | str | Credentials
Can be a dictionary of Google Drive API credentials, parsed from JSON provided
by the Google Developer Console, or a path string pointing to credentials
saved on disk, or a google.oauth2.credentials.Credentials object. Required
if env variable ``GOOGLE_DRIVE_CREDENTIALS`` is not populated.
"""
def __init__(
self,
app_creds: str | dict | Credentials | None = None,
):
scopes = [
"https://www.googleapis.com/auth/drive",
]
if isinstance(app_creds, Credentials):
credentials = app_creds
else:
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(
app_creds, target_env_var_name=env_credentials_path
)
credentials = load_google_application_credentials(env_credentials_path, scopes=scopes)
self.client = build(
"drive",
"v3",
credentials=credentials,
cache_discovery=False,
)
def create_folder(self, name: str, parents: list[str] | str | None = None) -> str:
if isinstance(parents, str):
parents = [parents]
elif parents is None:
parents = []
response = (
self.client.files()
.create(
body={
"name": name,
"mimeType": "application/vnd.google-apps.folder",
"parents": parents,
},
fields="id",
)
.execute()
)
return response.get("id")
def find_subfolder(self, subfolder_name: str, parent_folder_id: str) -> str | None:
response = (
self.client.files()
.list(
q=f"'{parent_folder_id}' in parents and mimeType='application/vnd.google-apps.folder'",
fields="files(id, name)",
)
.execute()
)
match = [i for i in response.get("files") if i.get("name") == subfolder_name]
result = match[0].get("id") if match else None
return result
def find_file_in_folder(
self, file_name: str, folder_id: str, fields: list[str] | None = None
) -> list[dict[str, str]]:
if not fields:
fields = ["id", "name"]
page_token = None
results = []
while True:
response = (
self.client.files()
.list(
q=f"'{folder_id}' in parents and name = '{file_name}'",
spaces="drive",
fields="nextPageToken, files({})".format(",".join(fields)),
pageToken=page_token,
)
.execute()
)
results.extend(response.get("files", []))
page_token = response.get("nextPageToken")
if page_token is None:
break
return results
def list_files_in_folder(
self, folder_id: str, fields: list[str] | None = None
) -> list[dict[str, str]]:
if not fields:
fields = ["id", "name"]
page_token = None
results = []
while True:
response = (
self.client.files()
.list(
q=f"'{folder_id}' in parents",
spaces="drive",
fields="nextPageToken, files({})".format(",".join(fields)),
pageToken=page_token,
supportsTeamDrives=True,
includeItemsFromAllDrives=True,
)
.execute()
)
results.extend(response.get("files", []))
page_token = response.get("nextPageToken")
if page_token is None:
break
return results
def empty_folder(self, folder_id: str) -> None:
folder_contents = self.list_files_in_folder(folder_id)
for drive_file in folder_contents:
self.client.files().delete(
fileId=drive_file.get("id"),
).execute()
[docs]
def download_file(self, file_id: str) -> str:
"""Download file from Drive to disk. Returns local filepath."""
filepath = tempfile.mkstemp()[1]
done = False
with Path(filepath).open(mode="wb") as file:
downloader = MediaIoBaseDownload(file, self.client.files().get_media(fileId=file_id))
while not done:
status, done = downloader.next_chunk()
return filepath
def upload_file(self, file_path: str, parent_folder_id: str) -> str:
file_metadata = {
"name": Path(file_path).name,
"parents": [parent_folder_id],
}
media = MediaFileUpload(file_path)
response = (
self.client.files().create(body=file_metadata, media_body=media, fields="id").execute()
)
return response.get("id")
[docs]
def replace_file(self, file_path: str, file_id: str) -> str:
"""Replace file in drive."""
media = MediaFileUpload(file_path)
resp = self.client.files().update(fileId=file_id, media_body=media, fields="id").execute()
return resp.get("id")
[docs]
def upsert_file(self, file_path: str, parent_folder_id: str) -> str:
"""Create or replace file in drive folder, based on file name."""
file_name = Path(file_path).name
match_response = (
self.client.files()
.list(
q=f"name='{file_name}' and '{parent_folder_id}' in parents",
spaces="drive",
fields="files(id, name)",
)
.execute()
.get("files", [])
)
if match_response:
file_id = match_response[0].get("id")
result = self.replace_file(file_path, file_id)
else:
result = self.upload_file(file_path, parent_folder_id)
return result
[docs]
def copy_file(
self,
file_id: str,
destination_folder_id: str | None = None,
new_name: str | None = None,
) -> str:
"""
Copy a file within Google Drive.
Args:
file_id: str
The ID of the file to copy
destination_folder_id: str
The ID of the destination folder. If not provided, copies to the same parent folder.
new_name: str
The name for the copied file. If not provided, Drive will use "Copy of [original name]".
Returns:
str: The ID of the newly created copy
"""
body = {}
if new_name:
body["name"] = new_name
if destination_folder_id:
body["parents"] = [destination_folder_id]
response = self.client.files().copy(fileId=file_id, body=body, fields="id").execute()
return response.get("id")
[docs]
def get_permissions(self, file_id: str) -> dict:
"""
Args:
file_id: str
this is the ID of the object you are hoping to share
Returns:
permission dict
"""
p = self.client.permissions().list(fileId=file_id).execute()
return p
def _share_object(self, file_id: str, permission_dict: dict) -> dict:
# Send the request to share the file
p = self.client.permissions().create(fileId=file_id, body=permission_dict).execute()
return p
[docs]
def share_object(
self,
file_id: str,
email_addresses: list[str] | None = None,
role: Literal[
"owner",
"organizer",
"fileOrganizer",
"writer",
"commenter",
"reader",
] = "reader",
type: Literal["user", "group", "domain", "anyone"] = "user",
) -> list[dict]:
"""
Args:
file_id: str
this is the ID of the object you are hoping to share
email_addresses: list
this is the list of the email addresses you want to share;
set to a list of domains like `['domain']` if you choose `type='domain'`;
set to `None` if you choose `type='anyone'`
role: str
Options are -- owner, organizer, fileOrganizer, writer, commenter, reader
https://developers.google.com/drive/api/guides/ref-roles
type: str
Options are -- user, group, domain, anyone
Returns:
List of permission objects
"""
if role not in [
"owner",
"organizer",
"fileOrganizer",
"writer",
"commenter",
"reader",
]:
raise Exception(
f"{role} not from the allowed list of: \
owner, organizer, fileOrganizer, writer, commenter, reader"
)
if type not in ["user", "group", "domain", "anyone"]:
raise Exception(
f"{type} not from the allowed list of: \
user, group, domain, anyone"
)
if type == "domain":
permissions = [
{"type": type, "role": role, "domain": email} for email in email_addresses
]
else:
permissions = [
{"type": type, "role": role, "emailAddress": email} for email in email_addresses
]
new_permissions = []
for permission in permissions:
p = self._share_object(file_id, permission)
new_permissions.append(p)
return new_permissions
[docs]
def transfer_ownership(self, file_id: str, new_owner_email: str) -> None:
"""
Args:
file_id: str
this is the ID of the object you are hoping to share
new_owner_email: str
the email address of the intended new owner
"""
permissions = self.client.permissions().list(fileId=file_id).execute()
# Find the current owner
current_owner_permission = next(
(p for p in permissions.get("permissions", []) if "owner" in p), None
)
if current_owner_permission:
# Update the permission to transfer ownership
new_owner_permission = {
"type": "user",
"role": "owner",
"emailAddress": new_owner_email,
}
self.client.permissions().update(
fileId=file_id,
permissionId=current_owner_permission["id"],
body=new_owner_permission,
).execute()
logger.info(f"Ownership transferred successfully to {new_owner_email}.")
else:
logger.info("File does not have a current owner.")