Source code for parsons.airtable.airtable

from pyairtable import Api as client
from parsons.etl import Table
from parsons.utilities import check_env
import logging


logger = logging.getLogger(__name__)


[docs]class Airtable(object): """ `Args:` base_key: str The key/ID of the Airtable base that you will interact with, typically prefixed with `app`. table_name: str The name or key/ID of the table in the base. The table name is the equivalent of the sheet name in Excel or GoogleDocs. The ID can be found in the URL and is typically prefixed with `tbl`. personal_access_token: str The Airtable personal access token. Not required if ``AIRTABLE_PERSONAL_ACCESS_TOKEN`` env variable set. """ def __init__(self, base_key, table_name, personal_access_token=None): self.personal_access_token = check_env.check( "AIRTABLE_PERSONAL_ACCESS_TOKEN", personal_access_token ) self.client = client(self.personal_access_token).table(base_key, table_name)
[docs] def get_record(self, record_id): """ Returns a single record. `Args:` record_id: str The Airtable record `id` `Returns:` A dictionary of the record """ return self.client.get(record_id)
[docs] def get_records( self, fields=None, max_records=None, view=None, formula=None, sort=None, sample_size=None, ): """ `Args:` fields: str or lst Only return specified column or list of columns. The column name is case sensitive max_records: int The maximum total number of records that will be returned. view: str If set, only the records in that view will be returned. The records will be sorted according to the order of the view. formula: str The formula will be evaluated for each record, and if the result is not 0, false, "", NaN, [], or #Error! the record will be included in the response. If combined with view, only records in that view which satisfy the formula will be returned. For example, to only include records where ``COLUMN_A`` isn't empty, pass in: ``"NOT({COLUMN_A}='')"`` For more information see `Airtable Docs on formulas. <https://airtable.com/api>`_ Usage - Text Column is not empty: ``airtable.get_all(formula="NOT({COLUMN_A}='')")`` Usage - Text Column contains: ``airtable.get_all(formula="FIND('SomeSubText', {COLUMN_STR})=1")`` sort: str or lst Specifies how the records will be ordered. If you set the view parameter, the returned records in that view will be sorted by these fields. If sorting by multiple columns, column names can be passed as a list. Sorting Direction is ascending by default, but can be reversed by prefixing the column name with a minus sign -. Example usage: ``airtable.get_records(sort=['ColumnA', '-ColumnB'])`` sample_size: int Number of rows to sample before determining columns `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ if isinstance(fields, str): fields = [fields] # Raises an error if sort is None type. Thus, only adding if populated. kwargs = { "fields": fields, "max_records": max_records, "view": view, "formula": formula, } if sort: kwargs["sort"] = sort tbl = Table(self.client.all(**kwargs)) # If the results are empty, then return an empty table. if "fields" not in tbl.columns: return Table([[]]) unpack_dicts_kwargs = { "column": "fields", "prepend": False, } if fields: unpack_dicts_kwargs["keys"] = fields if sample_size: unpack_dicts_kwargs["sample_size"] = sample_size return tbl.unpack_dict(**unpack_dicts_kwargs)
[docs] def insert_record(self, row, typecast=False): """ Insert a single record into an Airtable. `Args:` row: dict Fields to insert. Must be dictionary with Column names as Key. typecast: boolean Automatic data conversion from string values. `Returns:` Dictionary of inserted row """ resp = self.client.create(row, typecast=typecast) logger.info("Record inserted") return resp
[docs] def insert_records(self, table, typecast=False): """ Insert multiple records into an Airtable. The columns in your Parsons table must exist in the Airtable. The method will attempt to map based on column name, so the order of the columns is irrelevant. `Args:` table: A Parsons Table or list of dicts Insert a Parsons table or list typecast: boolean Automatic data conversion from string values. `Returns:` List of dictionaries of inserted rows """ if isinstance(table, Table): table = table.to_dicts() resp = self.client.batch_create(table, typecast=typecast) logger.info(f"{len(table)} records inserted.") return resp
[docs] def update_record(self, record_id, fields, typecast=False, replace=False): """ Updates a record by its record `id`. Only Fields passed are updated, the rest are left as is. `Args:` record_id: str The Airtable record `id` fields: dict Fields to insert. Must be dictionary with Column names as Key. typecast: boolean Automatic data conversion from string values. replace: boolean Only provided fields are updated. If `True`, record is replaced in its entirety by provided fields; if a field is not included its value will bet set to null. `Returns:` Dictionary of updated row """ resp = self.client.update(record_id, fields, typecast=typecast, replace=replace) logger.info(f"{record_id} updated") return resp
[docs] def update_records(self, table, typecast=False, replace=False): """ Update multiple records into an Airtable. The columns in your Parsons table must exist in the Airtable, and the record `id` column must be present. The method will attempt to map based on column name, so the order of the columns is irrelevant. `Args:` table: A Parsons Table or list of dicts Insert a Parsons table or list. Record must contain the record `id` column and columns containing the fields to update typecast: boolean Automatic data conversion from string values. replace: boolean Only provided fields are updated. If `True`, record is replaced in its entirety by provided fields; if a field is not included its value will bet set to null. `Returns:` List of dicts of updated records """ # the update/upsert API call expects a dict/object shape of: # { id: string, fields: { column_name: value, ... } } # the map_update_fields helper will convert the flat table field # columns/keys into this nested structure table = list(map(map_update_fields, table)) resp = self.client.batch_update(table, typecast=typecast, replace=replace) logger.info(f"{len(resp)} records updated.") return resp
[docs] def upsert_records(self, table, key_fields=None, typecast=False, replace=False): """ Update and/or create records, either using `id` (if included) or using a set of fields (`key_fields`) to look for matches. The columns in your Parsons table must exist in the Airtable. The method will attempt to map based on column name, so the order of the columns is irrelevant. `Args:` table: A Parsons Table or list of dicts Parsons table or list with records to upsert. Records must contain the record `id` column or the column(s) defined in `key_fields`. key_fields: list of str List of field names that Airtable should use to match records in the input with existing records. typecast: boolean Automatic data conversion from string values. replace: boolean Only provided fields are updated. If `True`, record is replaced in its entirety by provided fields; if a field is not included its value will bet set to null. `Returns:` Dictionary containing: - `updated_records`: list of updated record `id`s - `created_records`: list of created records `id`s - `records`: list of records """ # the update/upsert API call expects a dict/object shape of: # { id: string, fields: { column_name: value, ... } } # the map_update_fields helper will convert the flat table field # columns/keys into this nested structure table = list(map(map_update_fields, table)) resp = self.client.batch_upsert(table, key_fields, typecast=typecast, replace=replace) updated_records = resp["updatedRecords"] created_records = resp["createdRecords"] logger.info( f"{len(updated_records)} records updated, {len(created_records)} records created." ) return { "records": resp["records"], "updated_records": updated_records, "created_records": created_records, }
[docs] def delete_record(self, record_id): """ Deletes a record by its record `id`. `Args:` record_id: str The Airtable record `id` `Returns:` Dictionary of record `id` and `deleted` status """ resp = self.client.delete(record_id) logger.info(f"{record_id} updated") return resp
[docs] def delete_records(self, table): """ Delete multiple records from an Airtable. `Args:` table: A Parsons Table or list containing the record `id`s to delete. `Returns:` List of dicts with record `id` and `deleted` status """ if isinstance(table, Table): table = table.to_dicts() # the API expects a list of ids which this method can accept directly; # otherwise if a table or list of dicts containing the `id` key/column # is provided then map the ids into the expected list of id strings. if any(isinstance(row, dict) for row in table): table = list(map(lambda row: row["id"], table)) resp = self.client.batch_delete(table) logger.info(f"{len(table)} records deleted.") return resp
def map_update_fields(record): record_id = record.get("id") if "id" in record: del record["id"] return {"id": record_id, "fields": record} return {"fields": record}