Source code for parsons.azure.azure_blob_storage

import logging
import os
from urllib.parse import urlparse

from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient, ContentSettings, generate_blob_sas

from parsons.utilities import check_env, files

logger = logging.getLogger(__name__)


[docs]class AzureBlobStorage(object): """ Instantiate AzureBlobStorage Class for a given Azure storage account. `Args:` account_name: str The name of the Azure storage account to use. Not required if ``AZURE_ACCOUNT_NAME`` environment variable is set, or if ``account_url`` is supplied. credential: str An account shared access key with access to the Azure storage account, an SAS token string, or an instance of a TokenCredentials class. Not required if ``AZURE_CREDENTIAL`` environment variable is set. account_domain: str The domain of the Azure storage account, defaults to "blob.core.windows.net". Not required if ``AZURE_ACCOUNT_DOMAIN`` environment variable is set or if ``account_url`` is supplied. account_url: str The account URL for the Azure storage account including the account name and domain. Not required if ``AZURE_ACCOUNT_URL`` environment variable is set. `Returns:` `AzureBlobStorage` """ def __init__(self, account_name=None, credential=None, account_domain='blob.core.windows.net', account_url=None): self.account_url = os.getenv('AZURE_ACCOUNT_URL', account_url) self.credential = check_env.check('AZURE_CREDENTIAL', credential) if not self.account_url: self.account_name = check_env.check('AZURE_ACCOUNT_NAME', account_name) self.account_domain = check_env.check('AZURE_ACCOUNT_DOMAIN', account_domain) self.account_url = f'https://{self.account_name}.{self.account_domain}/' else: if not self.account_url.startswith('http'): self.account_url = f'https://{self.account_url}' # Update the account name and domain if a URL is supplied parsed_url = urlparse(self.account_url) self.account_name = parsed_url.netloc.split(".")[0] self.account_domain = ".".join(parsed_url.netloc.split(".")[1:]) self.client = BlobServiceClient(account_url=self.account_url, credential=self.credential)
[docs] def list_containers(self): """ Returns a list of container names for the storage account `Returns:` list[str] List of container names """ container_names = [container.name for container in self.client.list_containers()] logger.info(f'Found {len(container_names)} containers.') return container_names
[docs] def container_exists(self, container_name): """ Verify that a container exists within the storage account `Args:` container_name: str The name of the container `Returns:` bool """ container_client = self.get_container(container_name) try: container_client.get_container_properties() logger.info(f'{container_name} exists.') return True except ResourceNotFoundError: logger.info(f'{container_name} does not exist.') return False
[docs] def get_container(self, container_name): """ Returns a container client `Args:` container_name: str The name of the container `Returns:` `ContainerClient` """ logger.info(f'Returning {container_name} container client') return self.client.get_container_client(container_name)
[docs] def create_container(self, container_name, metadata=None, public_access=None, **kwargs): """ Create a container `Args:` container_name: str The name of the container metadata: Optional[dict[str, str]] A dict with metadata to associated with the container. public_access: Optional[Union[PublicAccess, str]] Settings for public access on the container, can be 'container' or 'blob' if not ``None`` kwargs: Additional arguments to be supplied to the Azure Blob Storage API. See `Azure Blob Storage SDK documentation <https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobserviceclient?view=azure-python#create-container-name--metadata-none--public-access-none----kwargs->`_ for more info. `Returns:` `ContainerClient` """ # noqa container_client = self.client.create_container( container_name, metadata=metadata, public_access=public_access, **kwargs ) logger.info(f'Created {container_name} container.') return container_client
[docs] def delete_container(self, container_name): """ Delete a container. `Args:` container_name: str The name of the container `Returns:` ``None`` """ self.client.delete_container(container_name) logger.info(f'{container_name} container deleted.')
[docs] def list_blobs(self, container_name, name_starts_with=None): """ List all of the names of blobs in a container `Args:` container_name: str The name of the container name_starts_with: Optional[str] A prefix to filter blob names `Returns:` list[str] A list of blob names """ container_client = self.get_container(container_name) blobs = [ blob for blob in container_client.list_blobs(name_starts_with=name_starts_with) ] logger.info(f'Found {len(blobs)} blobs in {container_name} container.') return blobs
[docs] def blob_exists(self, container_name, blob_name): """ Verify that a blob exists in the specified container `Args:` container_name: str The container name blob_name: str The blob name `Returns:` bool """ blob_client = self.get_blob(container_name, blob_name) try: blob_client.get_blob_properties() logger.info(f'{blob_name} exists in {container_name} container.') return True except ResourceNotFoundError: logger.info(f'{blob_name} does not exist in {container_name} container.') return False
[docs] def get_blob(self, container_name, blob_name): """ Get a blob object `Args:` container_name: str The container name blob_name: str The blob name `Returns:` `BlobClient` """ blob_client = self.client.get_blob_client(container_name, blob_name) logger.info(f'Got {blob_name} blob from {container_name} container.') return blob_client
[docs] def get_blob_url(self, container_name, blob_name, account_key=None, permission=None, expiry=None, start=None): """ Get a URL with a shared access signature for a blob `Args:` container_name: str The container name blob_name: str The blob name account_key: Optional[str] An account shared access key for the storage account. Will default to the key used on initialization if one was provided as the credential, but required if it was not. permission: Optional[Union[BlobSasPermissions, str]] Permissions associated with the blob URL. Can be either a BlobSasPermissions object or a string where 'r', 'a', 'c', 'w', and 'd' correspond to read, add, create, write, and delete permissions respectively. expiry: Optional[Union[datetime, str]] The datetime when the URL should expire. Defaults to UTC. start: Optional[Union[datetime, str]] The datetime when the URL should become valid. Defaults to UTC. If it is ``None``, the URL becomes active when it is first created. `Returns:` str URL with shared access signature for blob """ if not account_key: if not self.credential: raise ValueError( f'An account shared access key must be provided if it was not on initialization' ) account_key = self.credential sas = generate_blob_sas( self.account_name, container_name, blob_name, account_key=account_key, permission=permission, expiry=expiry, start=start, ) return f'{self.account_url}/{container_name}/{blob_name}?sas={sas}'
def _get_content_settings_from_dict(self, kwargs_dict): """ Removes any keys for ``ContentSettings`` from a dict and returns a tuple of the generated settings or ``None`` and a dict with the settings keys removed. `Args:` kwargs_dict: dict A dict which should be processed and may have keys for ``ContentSettings`` `Returns:` Tuple[Optional[ContentSettings], dict] Any created settings or ``None`` and the dict with settings keys remvoed """ kwargs_copy = {**kwargs_dict} content_settings = None content_settings_dict = {} content_settings_keys = [ 'content_type', 'content_encoding', 'content_language', 'content_disposition', 'cache_control', 'content_md5' ] kwarg_keys = list(kwargs_copy.keys()) for key in kwarg_keys: if key in content_settings_keys: content_settings_dict[key] = kwargs_copy.pop(key) if content_settings_dict: content_settings = ContentSettings(**content_settings_dict) return content_settings, kwargs_copy
[docs] def put_blob(self, container_name, blob_name, local_path, **kwargs): """ Puts a blob (aka file) in a bucket `Args:` container_name: str The name of the container to store the blob blob_name: str The name of the blob to be stored local_path: str The local path of the file to upload kwargs: Additional arguments to be supplied to the Azure Blob Storage API. See `Azure Blob Storage SDK documentation <https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#upload-blob-data--blob-type--blobtype-blockblob---blockblob----length-none--metadata-none----kwargs->`_ for more info. Any keys that belong to the ``ContentSettings`` object will be provided to that class directly. `Returns:` `BlobClient` """ # noqa blob_client = self.get_blob(container_name, blob_name) # Move all content_settings keys into a ContentSettings object content_settings, kwargs_dict = self._get_content_settings_from_dict(kwargs) with open(local_path, 'rb') as f: data = f.read() blob_client = blob_client.upload_blob( data, overwrite=True, content_settings=content_settings, **kwargs_dict, ) logger.info(f'{blob_name} blob put in {container_name} container') # Return refreshed BlobClient object return self.get_blob(container_name, blob_name)
[docs] def download_blob(self, container_name, blob_name, local_path=None): """ Downloads a blob from a container into the specified file path or a temporary file path `Args:` container_name: str The container name blob_name: str The blob name local_path: Optional[str] The local path where the file will be downloaded. If not specified, a temporary file will be created and returned, and that file will be removed automatically when the script is done running. `Returns:` str The path of the downloaded file """ if not local_path: local_path = files.create_temp_file_for_path('TEMPFILEAZURE') blob_client = self.get_blob(container_name, blob_name) logger.info(f'Downloading {blob_name} blob from {container_name} container.') with open(local_path, 'wb') as f: blob_client.download_blob().readinto(f) logger.info(f'{blob_name} blob saved to {local_path}.') return local_path
[docs] def delete_blob(self, container_name, blob_name): """ Delete a blob in a specified container. `Args:` container_name: str The container name blob_name: str The blob name `Returns:` ``None`` """ blob_client = self.get_blob(container_name, blob_name) blob_client.delete_blob() logger.info(f'{blob_name} blob in {container_name} container deleted.')
[docs] def upload_table(self, table, container_name, blob_name, data_type='csv', **kwargs): """ Load the data from a Parsons table into a blob. `Args:` table: obj A :ref:`parsons-table` container_name: str The container name to upload the data into blob_name: str The blob name to upload the data into data_type: str The file format to use when writing the data. One of: `csv` or `json` kwargs: Additional keyword arguments to supply to ``put_blob`` `Returns:` `BlobClient` """ if data_type == 'csv': local_path = table.to_csv() content_type = 'text/csv' elif data_type == 'json': local_path = table.to_json() content_type = 'application/json' else: raise ValueError(f'Unknown data_type value ({data_type}): must be one of: csv or json') return self.put_blob( container_name, blob_name, local_path, content_type=content_type, **kwargs )