Source code for parsons.aws.s3

import re
import boto3
from parsons.utilities import files
import logging
import os

logger = logging.getLogger(__name__)


class AWSConnection(object):

    def __init__(self, aws_access_key_id=None,
                 aws_secret_access_key=None,
                 aws_session_token=None):

        # Order of operations for searching for keys:
        #   1. Look for keys passed as kwargs
        #   2. Look for env variables
        #   3. Look for aws config file
        # Boto3 handles 2 & 3, but should handle 1 on it's own. Not sure
        # why that's not working.

        if aws_access_key_id and aws_secret_access_key:
            # The AWS session token isn't needed most of the time, so we'll check
            # for the env variable here instead of requiring it to be passed
            # whenever the aws_access_key_id and aws_secret_access_key are passed.
            if aws_session_token is None:
                aws_session_token = os.getenv('AWS_SESSION_TOKEN')

            self.session = boto3.Session(aws_access_key_id=aws_access_key_id,
                                         aws_secret_access_key=aws_secret_access_key,
                                         aws_session_token=aws_session_token)

        else:
            self.session = boto3.Session()


[docs]class S3(object): """ Instantiate the S3 class. `Args:` aws_access_key_id: str The AWS access key id. Not required if the ``AWS_ACCESS_KEY_ID`` env variable is set. aws_secret_access_key: str The AWS secret access key. Not required if the ``AWS_SECRET_ACCESS_KEY`` env variable is set. aws_session_token: str The AWS session token. Optional. Can also be stored in the ``AWS_SESSION_TOKEN`` env variable. Used for accessing S3 with temporary credentials. `Returns:` S3 class. """ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None): self.aws = AWSConnection(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token) self.s3 = self.aws.session.resource('s3') """Boto3 API Session Resource object. Use for more advanced boto3 features.""" self.client = self.s3.meta.client """Boto3 API Session client object. Use for more advanced boto3 features."""
[docs] def list_buckets(self): """ List all buckets to which you have access. `Returns:` list """ return [bucket.name for bucket in self.s3.buckets.all()]
[docs] def bucket_exists(self, bucket): """ Determine if a bucket exists and you have access to it. `Args:` bucket: str The bucket name `Returns:` boolean ``True`` if the bucket exists and ``False`` if not. """ try: # If we can list the keys, the bucket definitely exists. We do this check since # it will account for buckets that live on other AWS accounts and that we # have access to. self.list_keys(bucket) return True except Exception: pass return bucket in self.list_buckets()
[docs] def list_keys(self, bucket, prefix=None, suffix=None, regex=None, date_modified_before=None, date_modified_after=None, **kwargs): """ List the keys in a bucket, along with extra info about each one. `Args:` bucket: str The bucket name prefix: str Limits the response to keys that begin with the specified prefix. suffix: str Limits the response to keys that end with specified suffix regex: str Limits the reponse to keys that match a regex pattern date_modified_before: datetime.datetime Limits the response to keys with date modified before date_modified_after: datetime.datetime Limits the response to keys with date modified after kwargs: Additional arguments for the S3 API call. See `AWS ListObjectsV2 documentation <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.list_objects_v2>`_ for more info. `Returns:` dict Dict mapping the keys to info about each key. The info includes 'LastModified', 'Size', and 'Owner'. """ keys_dict = dict() logger.debug(f'Fetching keys in {bucket} bucket') continuation_token = None while True: args = {'Bucket': bucket} if prefix: args['Prefix'] = prefix if continuation_token: args['ContinuationToken'] = continuation_token args.update(kwargs) resp = self.client.list_objects_v2(**args) for key in resp.get('Contents', []): # Match suffix if suffix and not key['Key'].endswith(suffix): continue # Regex matching if regex and not bool(re.search(regex, key['Key'])): continue # Match timestamp parsing if date_modified_before and not key['LastModified'] < date_modified_before: continue if date_modified_after and not key['LastModified'] > date_modified_after: continue # Convert date to iso string key['LastModified'] = key['LastModified'].isoformat() # Add to output dict keys_dict[key.get('Key')] = key # If more than 1000 results, continue with token if resp.get('NextContinuationToken'): continuation_token = resp['NextContinuationToken'] else: break logger.debug(f'Retrieved {len(keys_dict)} keys') return keys_dict
[docs] def key_exists(self, bucket, key): """ Determine if a key exists in a bucket. `Args:` bucket: str The bucket name key: str The object key `Returns:` boolean ``True`` if key exists and ``False`` if not. """ key_count = len(self.list_keys(bucket, prefix=key)) if key_count > 0: logger.debug(f'Found {key} in {bucket}.') return True else: logger.debug(f'Did not find {key} in {bucket}.') return False
[docs] def create_bucket(self, bucket): """ Create an s3 bucket. .. warning:: S3 has a limit on the number of buckets you can create in an AWS account, and that limit is fairly low (typically 100). If you are creating buckets frequently, you may be mis-using S3, and should consider using the same bucket for multiple tasks. There is no limit on the number of objects in a bucket. See `AWS bucket restrictions <https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html>`_ for more info. .. warning:: S3 bucket names are *globally* unique. So when creating a new bucket, the name can't collide with any existing bucket names. If the provided name does collide, you'll see errors like `IllegalLocationConstraintException` or `BucketAlreadyExists`. `Args:` bucket: str The name of the bucket to create `Returns:` ``None`` """ self.client.create_bucket(Bucket=bucket)
[docs] def put_file(self, bucket, key, local_path, acl='bucket-owner-full-control', **kwargs): """ Uploads an object to an S3 bucket `Args:` bucket: str The bucket name key: str The object key local_path: str The local path of the file to upload acl: str The S3 permissions on the file kwargs: Additional arguments for the S3 API call. See `AWS Put Object documentation <https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html>`_ for more info. """ self.client.upload_file(local_path, bucket, key, ExtraArgs={'ACL': acl, **kwargs})
[docs] def remove_file(self, bucket, key): """ Deletes an object from an S3 bucket `Args:` bucket: str The bucket name key: str The object key `Returns:` ``None`` """ self.client.delete_object(Bucket=bucket, Key=key)
[docs] def get_file(self, bucket, key, local_path=None, **kwargs): """ Download an object from S3 to a local file `Args:` local_path: str The local path where the file will be downloaded. If not specified, a temporary file will be created and returned, and that file will be removed automatically when the script is done running. bucket: str The bucket name key: str The object key kwargs: Additional arguments for the S3 API call. See `AWS download_file documentation <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.download_file>`_ for more info. `Returns:` str The path of the new file """ if not local_path: local_path = files.create_temp_file_for_path(key) self.s3.Object(bucket, key).download_file(local_path, ExtraArgs=kwargs) return local_path
[docs] def get_url(self, bucket, key, expires_in=3600): """ Generates a presigned url for an s3 object. `Args:` bucket: str The bucket name key: str The object name expires_in: int The time, in seconds, until the url expires `Returns:` Url: A link to download the object """ return self.client.generate_presigned_url(ClientMethod='get_object', Params={'Bucket': bucket, 'Key': key}, ExpiresIn=expires_in)
[docs] def transfer_bucket(self, origin_bucket, origin_key, destination_bucket, destination_key=None, suffix=None, regex=None, date_modified_before=None, date_modified_after=None, public_read=False, remove_original=False, **kwargs): """ Transfer files between s3 buckets `Args:` origin_bucket: str The origin bucket origin_key: str The origin file or prefix destination_bucket: str The destination bucket destination_key: str If `None` then will retain the `origin key`. If set to prefix will move all to new prefix suffix: str Limits the response to keys that end with specified suffix regex: str Limits the reponse to keys that match a regex pattern date_modified_before: datetime.datetime Limits the response to keys with date modified before date_modified_after: datetime.datetime Limits the response to keys with date modified after public_read: bool If the keys should be set to `public-read` remove_original: bool If the original keys should be removed after transfer kwargs: Additional arguments for the S3 API call. See `AWS download_file docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy>`_ for more info. `Returns:` ``None`` """ # If prefix, get all files for the prefix if origin_key.endswith('/'): resp = self.list_keys( origin_bucket, prefix=origin_key, suffix=suffix, regex=regex, date_modified_before=date_modified_before, date_modified_after=date_modified_after ) key_list = [value['Key'] for value in resp.values()] else: key_list = [origin_key] for key in key_list: # If destination_key is prefix, replace if destination_key and destination_key.endswith('/'): dest_key = key.replace(origin_key, destination_key) # If single destination, use destination key elif destination_key: dest_key = destination_key # Else use key from original source else: dest_key = key copy_source = {'Bucket': origin_bucket, 'Key': key} self.client.copy(copy_source, destination_bucket, dest_key, ExtraArgs=kwargs) if remove_original: try: self.remove_file(origin_bucket, origin_key) except Exception as e: logger.error('Failed to delete original key: ' + str(e)) if public_read: object_acl = self.s3.ObjectAcl(destination_bucket, destination_key) object_acl.put(ACL='public-read') logger.info(f'Finished syncing {len(key_list)} keys')