Standardizing Input Values with Pyspark

Standardizing Input Values with Pyspark

Mapping column values

·

7 min read

Many times we want to perform some mapping of one value to another. This can be done in numerous ways. We'll consider a few and take a look at performance impacts.

The code can be imported as a Databricks Notebook with the source here.

Setup

Create a dataframe with inconsistent values we want to standardize/map.

df = spark.createDataFrame([(1, 2, 'Visa', 4), (1, 2, 'VISA', 5), (1, 2, 'MasterCard', 6), (1, 2, 'Amex', 7), (1, 2, 'Cash', 100), (1, 2, 'Weird - CAD $', 101)], ['a', 'b', 'tender_type', 'tender_amount'])

df.display()

TENDER_TYPE = 'tender_type'
CREDIT_AND_DEBIT_CARD_TYPES = ['AMEX', 'Mastercard', 'VISA', 'Discover', 'Debit', 'Debit/Credit']
PAYMENT_TYPES = ['Cash', 'SBUX Card', 'NA', 'Weird - CAD $', 'Weird - USD $', *CREDIT_AND_DEBIT_CARD_TYPES]

For example, we want to map Amex to AMEX.

+---+---+-------------+-------------+
|  a|  b|  tender_type|tender_amount|
+---+---+-------------+-------------+
|  1|  2|         Visa|            4|
|  1|  2|         VISA|            5|
|  1|  2|   MasterCard|            6|
|  1|  2|         Amex|            7|
|  1|  2|         Cash|          100|
|  1|  2|Weird - CAD $|          101|
+---+---+-------------+-------------+

Option 1 - Loop with Regex Replace

This is by far the simplest to do. Loop through the standard values and trim/strip whitespace then perform a regular expression replacement on the tender_type values.

df1 = df
for c in PAYMENT_TYPES:
  df1 = df1.withColumn(TENDER_TYPE, f.ltrim(f.col(TENDER_TYPE))) \
       .withColumn(TENDER_TYPE, f.rtrim(f.col(TENDER_TYPE))) \
       .withColumn(TENDER_TYPE, f.regexp_replace(TENDER_TYPE, fr"(?i)\b{c}\b", c))

df1.display()
df1.explain(mode="cost")

This yields:

+---+---+-------------+-------------+
|  a|  b|  tender_type|tender_amount|
+---+---+-------------+-------------+
|  1|  2|         VISA|            4|
|  1|  2|         VISA|            5|
|  1|  2|   Mastercard|            6|
|  1|  2|         AMEX|            7|
|  1|  2|         Cash|          100|
|  1|  2|Weird - CAD $|          101|
+---+---+-------------+-------------+
== Optimized Logical Plan ==
Project [a#710287L, b#710288L, regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(tender_type#710289, None), None), (?i)\bCash\b, Cash, 1), None), None), (?i)\bSBUX Card\b, SBUX Card, 1), None), None), (?i)\bNA\b, NA, 1), None), None), (?i)\bWeird - CAD $\b, Weird - CAD $, 1), None), None), (?i)\bWeird - USD $\b, Weird - USD $, 1), None), None), (?i)\bAMEX\b, AMEX, 1), None), None), (?i)\bMastercard\b, Mastercard, 1), None), None), (?i)\bVISA\b, VISA, 1), None), None), (?i)\bDiscover\b, Discover, 1), None), None), (?i)\bDebit\b, Debit, 1), None), None), (?i)\bDebit/Credit\b, Debit/Credit, 1) AS tender_type#710464, tender_amount#710290L], Statistics(sizeInBytes=8.0 EiB)
+- LogicalRDD [a#710287L, b#710288L, tender_type#710289, tender_amount#710290L], false, Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
*(1) Project [a#710287L, b#710288L, regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(tender_type#710289, None), None), (?i)\bCash\b, Cash, 1), None), None), (?i)\bSBUX Card\b, SBUX Card, 1), None), None), (?i)\bNA\b, NA, 1), None), None), (?i)\bWeird - CAD $\b, Weird - CAD $, 1), None), None), (?i)\bWeird - USD $\b, Weird - USD $, 1), None), None), (?i)\bAMEX\b, AMEX, 1), None), None), (?i)\bMastercard\b, Mastercard, 1), None), None), (?i)\bVISA\b, VISA, 1), None), None), (?i)\bDiscover\b, Discover, 1), None), None), (?i)\bDebit\b, Debit, 1), None), None), (?i)\bDebit/Credit\b, Debit/Credit, 1) AS tender_type#710464, tender_amount#710290L]
+- *(1) Scan ExistingRDD[a#710287L,b#710288L,tender_type#710289,tender_amount#710290L]

Option 2 - Lowercase the values and create map

This solution offers better performance at the cost of setting everything to lower and expecting everything to be valid values within PAYMENT_TYPES.

df3 = df.withColumn(TENDER_TYPE, f.ltrim(f.col(TENDER_TYPE))) \
    .withColumn(TENDER_TYPE, f.rtrim(f.col(TENDER_TYPE))) \
    .withColumn(TENDER_TYPE, f.lower(f.col(TENDER_TYPE)))

z = [*zip([f.lit(pt.lower()) for pt in PAYMENT_TYPES], [f.lit(pt) for pt in PAYMENT_TYPES])]
tup = sum(z, ())
map_col = f.create_map(*tup)

df4 = df3.withColumn(TENDER_TYPE, map_col[f.col(TENDER_TYPE)])

df4.show()
df4.explain(mode="cost")

Create a list of tuples of column literal lower-lower case and expected case values.

[(Column<'cash'>, Column<'Cash'>), (Column<'na'>, Column<'NA'>), (Column<'weird - cad $'>, Column<'Weird - CAD $'>), (Column<'weird - usd $'>, Column<'Weird - USD $'>), (Column<'amex'>, Column<'AMEX'>), (Column<'mastercard'>, Column<'Mastercard'>), (Column<'visa'>, Column<'VISA'>), (Column<'discover'>, Column<'Discover'>), (Column<'debit'>, Column<'Debit'>), (Column<'debit/credit'>, Column<'Debit/Credit'>)]

We then merge them to one tuple using sum:

(Column<'cash'>, Column<'Cash'>, Column<'na'>, Column<'NA'>, Column<'weird - cad $'>, Column<'Weird - CAD $'>, Column<'weird - usd $'>, Column<'Weird - USD $'>, Column<'amex'>, Column<'AMEX'>, Column<'mastercard'>, Column<'Mastercard'>, Column<'visa'>, Column<'VISA'>, Column<'discover'>, Column<'Discover'>, Column<'debit'>, Column<'Debit'>, Column<'debit/credit'>, Column<'Debit/Credit)

Using create_map we create a dictionary for literal lower-case values for each payment type element in PAYMENT_TYPES list.

+---+---+-------------+-------------+
|  a|  b|  tender_type|tender_amount|
+---+---+-------------+-------------+
|  1|  2|         VISA|            4|
|  1|  2|         VISA|            5|
|  1|  2|   Mastercard|            6|
|  1|  2|         AMEX|            7|
|  1|  2|         Cash|          100|
|  1|  2|Weird - CAD $|          101|
+---+---+-------------+-------------+

== Optimized Logical Plan ==
Project [a#712012L, b#712013L, map(keys: [cash,sbux card,na,weird - cad $,weird - usd $,amex,mastercard,visa,discover,debit,debit/credit], values: [Cash,SBUX Card,NA,Weird - CAD $,Weird - USD $,AMEX,Mastercard,VISA,Discover,Debit,Debit/Credit])[lower(rtrim(ltrim(tender_type#712014, None), None))] AS tender_type#712590, tender_amount#712015L], Statistics(sizeInBytes=8.0 EiB)
+- LogicalRDD [a#712012L, b#712013L, tender_type#712014, tender_amount#712015L], false, Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
*(1) Project [a#712012L, b#712013L, map(keys: [cash,sbux card,na,weird - cad $,weird - usd $,amex,mastercard,visa,discover,debit,debit/credit], values: [Cash,SBUX Card,NA,Weird - CAD $,Weird - USD $,AMEX,Mastercard,VISA,Discover,Debit,Debit/Credit])[lower(rtrim(ltrim(tender_type#712014, None), None))] AS tender_type#712590, tender_amount#712015L]
+- *(1) Scan ExistingRDD[a#712012L,b#712013L,tender_type#712014,tender_amount#712015L]

Option 3 - Programmatically create case statements

This approach still requires case consistent comparison.

from functools import reduce

df5 = df.withColumn(TENDER_TYPE, f.ltrim(f.col(TENDER_TYPE))) \
    .withColumn(TENDER_TYPE, f.rtrim(f.col(TENDER_TYPE)))

def add_column_safely(df, payment_type):
  return df.withColumn(TENDER_TYPE, f.when(f.lower(f.col(TENDER_TYPE)) == payment_type.lower(), f.lit(payment_type)) \
    .otherwise(f.col(TENDER_TYPE)))

df6 = reduce(add_column_safely, PAYMENT_TYPES, df5)

df6.display()
df6.explain(mode="cost")

Yields:

+---+---+-------------+-------------+
|  a|  b|  tender_type|tender_amount|
+---+---+-------------+-------------+
|  1|  2|         VISA|            4|
|  1|  2|         VISA|            5|
|  1|  2|   Mastercard|            6|
|  1|  2|         AMEX|            7|
|  1|  2|         Cash|          100|
|  1|  2|Weird - CAD $|          101|
+---+---+-------------+-------------+

== Optimized Logical Plan ==
Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713587) = debit/credit) THEN Debit/Credit ELSE tender_type#713587 END AS tender_type#713592, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713582) = debit) THEN Debit ELSE tender_type#713582 END AS tender_type#713587, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
   +- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713577) = discover) THEN Discover ELSE tender_type#713577 END AS tender_type#713582, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
      +- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713572) = visa) THEN VISA ELSE tender_type#713572 END AS tender_type#713577, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
         +- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713567) = mastercard) THEN Mastercard ELSE tender_type#713567 END AS tender_type#713572, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
            +- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713562) = amex) THEN AMEX ELSE tender_type#713562 END AS tender_type#713567, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
               +- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713557) = weird - usd $) THEN Weird - USD $ ELSE tender_type#713557 END AS tender_type#713562, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
                  +- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713552) = weird - cad $) THEN Weird - CAD $ ELSE tender_type#713552 END AS tender_type#713557, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
                     +- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713547) = na) THEN NA ELSE tender_type#713547 END AS tender_type#713552, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
                        +- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713542) = cash) THEN Cash ELSE tender_type#713542 END AS tender_type#713547, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
                           +- Project [a#712760L, b#712761L, rtrim(ltrim(tender_type#712762, None), None) AS tender_type#713542, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
                              +- LogicalRDD [a#712760L, b#712761L, tender_type#712762, tender_amount#712763L], false, Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
*(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713587) = debit/credit) THEN Debit/Credit ELSE tender_type#713587 END AS tender_type#713592, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713582) = debit) THEN Debit ELSE tender_type#713582 END AS tender_type#713587, tender_amount#712763L]
   +- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713577) = discover) THEN Discover ELSE tender_type#713577 END AS tender_type#713582, tender_amount#712763L]
      +- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713572) = visa) THEN VISA ELSE tender_type#713572 END AS tender_type#713577, tender_amount#712763L]
         +- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713567) = mastercard) THEN Mastercard ELSE tender_type#713567 END AS tender_type#713572, tender_amount#712763L]
            +- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713562) = amex) THEN AMEX ELSE tender_type#713562 END AS tender_type#713567, tender_amount#712763L]
               +- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713557) = weird - usd $) THEN Weird - USD $ ELSE tender_type#713557 END AS tender_type#713562, tender_amount#712763L]
                  +- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713552) = weird - cad $) THEN Weird - CAD $ ELSE tender_type#713552 END AS tender_type#713557, tender_amount#712763L]
                     +- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713547) = na) THEN NA ELSE tender_type#713547 END AS tender_type#713552, tender_amount#712763L]
                        +- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713542) = cash) THEN Cash ELSE tender_type#713542 END AS tender_type#713547, tender_amount#712763L]
                           +- *(1) Project [a#712760L, b#712761L, rtrim(ltrim(tender_type#712762, None), None) AS tender_type#713542, tender_amount#712763L]
                              +- *(1) Scan ExistingRDD[a#712760L,b#712761L,tender_type#712762,tender_amount#712763L]

Execution Plans

Optimized Logical Plan

The logical plan contains all the logical operations such as filters, aggregation, groups, etc. These operations could be re-ordered and then passed down to create a physical plan.

Physical Plan

The optimized logical plan will generate a plan that describes how it will be physically executed on the cluster. The catalyst optimizer will generate many physical plans based on various strategies. Each physical plan will be estimated based on execution time and resource consumption projection, and only one plan will be selected to be executed. You will see the Exchanges/Shuffles to physical nodes and lower level RDD scans in this plan.

Summary

It seems like the performance difference between option 1 and 2 don't seem any different according to the execution plans. I tend to want to avoid loops when possible when dealing with Spark, but it seems the catalyst optimizer does a pretty good job yielding the same execution plan as if we didn't use a loop in this case. You should still be careful with what you are looping over and what you're trying to accomplish. Examine the execution plans to avoid bottlenecks. Option 3 runs without error in the notebook, but with a large number of values for PAYMENT_TYPES and input dataframes, I ran into some issues. Originally I wanted to avoid any lower-casing to the dataframe, however, the impact wasn't terrible.

References