Source code for parsons.ngpvan.email

from parsons.etl.table import Table
import logging

logger = logging.getLogger(__name__)


[docs] class Email(object): """ Instantiate the Email class. You can find the docs for the NGP VAN Email API here: https://docs.ngpvan.com/reference/email-overview """ def __init__(self, van_connection): self.connection = van_connection
[docs] def get_emails(self, ascending: bool = True) -> Table: """ Get emails. `Args:` ascending : Bool sorts results in ascending or descending order for the dateModified field. Defaults to True (ascending). `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ if ascending: params = { "$orderby": "dateModified asc", } if not ascending: params = { "$orderby": "dateModified desc", } tbl = Table(self.connection.get_request("email/messages", params=params)) logger.debug(f"Found {tbl.num_rows} emails.") return tbl
[docs] def get_email(self, email_id: int, expand: bool = True) -> Table: """ Get an email. Note that it takes some time for the system to aggregate opens and click-throughs, so data can be delayed up to 15 minutes. `Args:` email_id : int The email id. expand : bool Optional; expands the email message to include the email content and statistics. Defaults to True. `Returns:` dict """ params = { "$expand": ( "emailMessageContent, EmailMessageContentDistributions" if expand else None ), } r = self.connection.get_request(f"email/message/{email_id}", params=params) logger.debug(f"Found email {email_id}.") return r
[docs] def get_email_stats(self, aggregate_ab: bool = True) -> Table: """ Get stats for all emails, aggregating any A/B tests. `Args:` aggregate_ab : bool If A/B test results for emails should get aggregated. `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ email_list = [] final_email_list = [] emails = self.get_emails() foreign_message_ids = [email["foreignMessageId"] for email in emails] for fmid in foreign_message_ids: email = self.get_email(fmid) email_list.append(email) # Outside and inside emailMessageContentDistributions field outer_fields = [ "name", "createdBy", "dateCreated", "dateModified", "dateScheduled", "foreignMessageId", ] inner_fields = [ "recipientCount", "bounceCount", "contributionCount", "contributionTotal", "formSubmissionCount", "linksClickedCount", "machineOpenCount", "openCount", "unsubscribeCount", # "subject" # included here for clarity, but has some special logic ] # If we are aggregating, we have one entry per foreignMessageId (outer loop) and # sum over the values inside the inner loop. If we are not, then we need to loop # over foreignMessageId and each component of emailMessageContent, so we loop # over both and pull out data (with no aggregation) for each. if aggregate_ab: for email in email_list: # One row per foreignMessageId outer = {field: email[field] for field in outer_fields} inner = {field: 0 for field in inner_fields} for i in email["emailMessageContent"]: try: for field in inner_fields: # Aggregation of all inner values inner[field] += i["emailMessageContentDistributions"][field] # Just replacing subject to get the last one inner["subject"] = i["subject"] except KeyError as e: logger.info(str(e)) pass final_email_list.append({**outer, **inner}) else: for email in email_list: for i in email["emailMessageContent"]: # One row per foreignMessageId / emailMessageContent entry outer = {field: email[field] for field in outer_fields} inner = {field: 0 for field in inner_fields} try: for field in inner_fields: inner[field] = i["emailMessageContentDistributions"][field] inner["subject"] = i["subject"] except KeyError as e: logger.info(str(e)) final_email_list.append({**outer, **inner}) return Table(final_email_list)