Many times we want to perform some mapping of one value to another. This can be done in numerous ways. We'll consider a few and take a look at performance impacts.
The code can be imported as a Databricks Notebook with the source here.
Setup
Create a dataframe with inconsistent values we want to standardize/map.
df = spark.createDataFrame([(1, 2, 'Visa', 4), (1, 2, 'VISA', 5), (1, 2, 'MasterCard', 6), (1, 2, 'Amex', 7), (1, 2, 'Cash', 100), (1, 2, 'Weird - CAD $', 101)], ['a', 'b', 'tender_type', 'tender_amount'])
df.display()
TENDER_TYPE = 'tender_type'
CREDIT_AND_DEBIT_CARD_TYPES = ['AMEX', 'Mastercard', 'VISA', 'Discover', 'Debit', 'Debit/Credit']
PAYMENT_TYPES = ['Cash', 'SBUX Card', 'NA', 'Weird - CAD $', 'Weird - USD $', *CREDIT_AND_DEBIT_CARD_TYPES]
For example, we want to map Amex
to AMEX
.
+---+---+-------------+-------------+
| a| b| tender_type|tender_amount|
+---+---+-------------+-------------+
| 1| 2| Visa| 4|
| 1| 2| VISA| 5|
| 1| 2| MasterCard| 6|
| 1| 2| Amex| 7|
| 1| 2| Cash| 100|
| 1| 2|Weird - CAD $| 101|
+---+---+-------------+-------------+
Option 1 - Loop with Regex Replace
This is by far the simplest to do. Loop through the standard values and trim/strip whitespace then perform a regular expression replacement on the tender_type
values.
df1 = df
for c in PAYMENT_TYPES:
df1 = df1.withColumn(TENDER_TYPE, f.ltrim(f.col(TENDER_TYPE))) \
.withColumn(TENDER_TYPE, f.rtrim(f.col(TENDER_TYPE))) \
.withColumn(TENDER_TYPE, f.regexp_replace(TENDER_TYPE, fr"(?i)\b{c}\b", c))
df1.display()
df1.explain(mode="cost")
This yields:
+---+---+-------------+-------------+
| a| b| tender_type|tender_amount|
+---+---+-------------+-------------+
| 1| 2| VISA| 4|
| 1| 2| VISA| 5|
| 1| 2| Mastercard| 6|
| 1| 2| AMEX| 7|
| 1| 2| Cash| 100|
| 1| 2|Weird - CAD $| 101|
+---+---+-------------+-------------+
== Optimized Logical Plan ==
Project [a#710287L, b#710288L, regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(tender_type#710289, None), None), (?i)\bCash\b, Cash, 1), None), None), (?i)\bSBUX Card\b, SBUX Card, 1), None), None), (?i)\bNA\b, NA, 1), None), None), (?i)\bWeird - CAD $\b, Weird - CAD $, 1), None), None), (?i)\bWeird - USD $\b, Weird - USD $, 1), None), None), (?i)\bAMEX\b, AMEX, 1), None), None), (?i)\bMastercard\b, Mastercard, 1), None), None), (?i)\bVISA\b, VISA, 1), None), None), (?i)\bDiscover\b, Discover, 1), None), None), (?i)\bDebit\b, Debit, 1), None), None), (?i)\bDebit/Credit\b, Debit/Credit, 1) AS tender_type#710464, tender_amount#710290L], Statistics(sizeInBytes=8.0 EiB)
+- LogicalRDD [a#710287L, b#710288L, tender_type#710289, tender_amount#710290L], false, Statistics(sizeInBytes=8.0 EiB)
== Physical Plan ==
*(1) Project [a#710287L, b#710288L, regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(regexp_replace(rtrim(ltrim(tender_type#710289, None), None), (?i)\bCash\b, Cash, 1), None), None), (?i)\bSBUX Card\b, SBUX Card, 1), None), None), (?i)\bNA\b, NA, 1), None), None), (?i)\bWeird - CAD $\b, Weird - CAD $, 1), None), None), (?i)\bWeird - USD $\b, Weird - USD $, 1), None), None), (?i)\bAMEX\b, AMEX, 1), None), None), (?i)\bMastercard\b, Mastercard, 1), None), None), (?i)\bVISA\b, VISA, 1), None), None), (?i)\bDiscover\b, Discover, 1), None), None), (?i)\bDebit\b, Debit, 1), None), None), (?i)\bDebit/Credit\b, Debit/Credit, 1) AS tender_type#710464, tender_amount#710290L]
+- *(1) Scan ExistingRDD[a#710287L,b#710288L,tender_type#710289,tender_amount#710290L]
Option 2 - Lowercase the values and create map
This solution offers better performance at the cost of setting everything to lower and expecting everything to be valid values within PAYMENT_TYPES
.
df3 = df.withColumn(TENDER_TYPE, f.ltrim(f.col(TENDER_TYPE))) \
.withColumn(TENDER_TYPE, f.rtrim(f.col(TENDER_TYPE))) \
.withColumn(TENDER_TYPE, f.lower(f.col(TENDER_TYPE)))
z = [*zip([f.lit(pt.lower()) for pt in PAYMENT_TYPES], [f.lit(pt) for pt in PAYMENT_TYPES])]
tup = sum(z, ())
map_col = f.create_map(*tup)
df4 = df3.withColumn(TENDER_TYPE, map_col[f.col(TENDER_TYPE)])
df4.show()
df4.explain(mode="cost")
Create a list of tuples of column literal lower-lower case and expected case values.
[(Column<'cash'>, Column<'Cash'>), (Column<'na'>, Column<'NA'>), (Column<'weird - cad $'>, Column<'Weird - CAD $'>), (Column<'weird - usd $'>, Column<'Weird - USD $'>), (Column<'amex'>, Column<'AMEX'>), (Column<'mastercard'>, Column<'Mastercard'>), (Column<'visa'>, Column<'VISA'>), (Column<'discover'>, Column<'Discover'>), (Column<'debit'>, Column<'Debit'>), (Column<'debit/credit'>, Column<'Debit/Credit'>)]
We then merge them to one tuple using sum
:
(Column<'cash'>, Column<'Cash'>, Column<'na'>, Column<'NA'>, Column<'weird - cad $'>, Column<'Weird - CAD $'>, Column<'weird - usd $'>, Column<'Weird - USD $'>, Column<'amex'>, Column<'AMEX'>, Column<'mastercard'>, Column<'Mastercard'>, Column<'visa'>, Column<'VISA'>, Column<'discover'>, Column<'Discover'>, Column<'debit'>, Column<'Debit'>, Column<'debit/credit'>, Column<'Debit/Credit)
Using create_map
we create a dictionary for literal lower-case values for each payment type element in PAYMENT_TYPES
list.
+---+---+-------------+-------------+
| a| b| tender_type|tender_amount|
+---+---+-------------+-------------+
| 1| 2| VISA| 4|
| 1| 2| VISA| 5|
| 1| 2| Mastercard| 6|
| 1| 2| AMEX| 7|
| 1| 2| Cash| 100|
| 1| 2|Weird - CAD $| 101|
+---+---+-------------+-------------+
== Optimized Logical Plan ==
Project [a#712012L, b#712013L, map(keys: [cash,sbux card,na,weird - cad $,weird - usd $,amex,mastercard,visa,discover,debit,debit/credit], values: [Cash,SBUX Card,NA,Weird - CAD $,Weird - USD $,AMEX,Mastercard,VISA,Discover,Debit,Debit/Credit])[lower(rtrim(ltrim(tender_type#712014, None), None))] AS tender_type#712590, tender_amount#712015L], Statistics(sizeInBytes=8.0 EiB)
+- LogicalRDD [a#712012L, b#712013L, tender_type#712014, tender_amount#712015L], false, Statistics(sizeInBytes=8.0 EiB)
== Physical Plan ==
*(1) Project [a#712012L, b#712013L, map(keys: [cash,sbux card,na,weird - cad $,weird - usd $,amex,mastercard,visa,discover,debit,debit/credit], values: [Cash,SBUX Card,NA,Weird - CAD $,Weird - USD $,AMEX,Mastercard,VISA,Discover,Debit,Debit/Credit])[lower(rtrim(ltrim(tender_type#712014, None), None))] AS tender_type#712590, tender_amount#712015L]
+- *(1) Scan ExistingRDD[a#712012L,b#712013L,tender_type#712014,tender_amount#712015L]
Option 3 - Programmatically create case statements
This approach still requires case consistent comparison.
from functools import reduce
df5 = df.withColumn(TENDER_TYPE, f.ltrim(f.col(TENDER_TYPE))) \
.withColumn(TENDER_TYPE, f.rtrim(f.col(TENDER_TYPE)))
def add_column_safely(df, payment_type):
return df.withColumn(TENDER_TYPE, f.when(f.lower(f.col(TENDER_TYPE)) == payment_type.lower(), f.lit(payment_type)) \
.otherwise(f.col(TENDER_TYPE)))
df6 = reduce(add_column_safely, PAYMENT_TYPES, df5)
df6.display()
df6.explain(mode="cost")
Yields:
+---+---+-------------+-------------+
| a| b| tender_type|tender_amount|
+---+---+-------------+-------------+
| 1| 2| VISA| 4|
| 1| 2| VISA| 5|
| 1| 2| Mastercard| 6|
| 1| 2| AMEX| 7|
| 1| 2| Cash| 100|
| 1| 2|Weird - CAD $| 101|
+---+---+-------------+-------------+
== Optimized Logical Plan ==
Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713587) = debit/credit) THEN Debit/Credit ELSE tender_type#713587 END AS tender_type#713592, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713582) = debit) THEN Debit ELSE tender_type#713582 END AS tender_type#713587, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713577) = discover) THEN Discover ELSE tender_type#713577 END AS tender_type#713582, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713572) = visa) THEN VISA ELSE tender_type#713572 END AS tender_type#713577, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713567) = mastercard) THEN Mastercard ELSE tender_type#713567 END AS tender_type#713572, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713562) = amex) THEN AMEX ELSE tender_type#713562 END AS tender_type#713567, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713557) = weird - usd $) THEN Weird - USD $ ELSE tender_type#713557 END AS tender_type#713562, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713552) = weird - cad $) THEN Weird - CAD $ ELSE tender_type#713552 END AS tender_type#713557, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713547) = na) THEN NA ELSE tender_type#713547 END AS tender_type#713552, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713542) = cash) THEN Cash ELSE tender_type#713542 END AS tender_type#713547, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- Project [a#712760L, b#712761L, rtrim(ltrim(tender_type#712762, None), None) AS tender_type#713542, tender_amount#712763L], Statistics(sizeInBytes=8.0 EiB)
+- LogicalRDD [a#712760L, b#712761L, tender_type#712762, tender_amount#712763L], false, Statistics(sizeInBytes=8.0 EiB)
== Physical Plan ==
*(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713587) = debit/credit) THEN Debit/Credit ELSE tender_type#713587 END AS tender_type#713592, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713582) = debit) THEN Debit ELSE tender_type#713582 END AS tender_type#713587, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713577) = discover) THEN Discover ELSE tender_type#713577 END AS tender_type#713582, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713572) = visa) THEN VISA ELSE tender_type#713572 END AS tender_type#713577, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713567) = mastercard) THEN Mastercard ELSE tender_type#713567 END AS tender_type#713572, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713562) = amex) THEN AMEX ELSE tender_type#713562 END AS tender_type#713567, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713557) = weird - usd $) THEN Weird - USD $ ELSE tender_type#713557 END AS tender_type#713562, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713552) = weird - cad $) THEN Weird - CAD $ ELSE tender_type#713552 END AS tender_type#713557, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713547) = na) THEN NA ELSE tender_type#713547 END AS tender_type#713552, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, CASE WHEN (lower(tender_type#713542) = cash) THEN Cash ELSE tender_type#713542 END AS tender_type#713547, tender_amount#712763L]
+- *(1) Project [a#712760L, b#712761L, rtrim(ltrim(tender_type#712762, None), None) AS tender_type#713542, tender_amount#712763L]
+- *(1) Scan ExistingRDD[a#712760L,b#712761L,tender_type#712762,tender_amount#712763L]
Execution Plans
Optimized Logical Plan
The logical plan contains all the logical operations such as filters, aggregation, groups, etc. These operations could be re-ordered and then passed down to create a physical plan.
Physical Plan
The optimized logical plan will generate a plan that describes how it will be physically executed on the cluster. The catalyst optimizer will generate many physical plans based on various strategies. Each physical plan will be estimated based on execution time and resource consumption projection, and only one plan will be selected to be executed. You will see the Exchanges/Shuffles to physical nodes and lower level RDD scans in this plan.
Summary
It seems like the performance difference between option 1 and 2 don't seem any different according to the execution plans. I tend to want to avoid loops when possible when dealing with Spark, but it seems the catalyst optimizer does a pretty good job yielding the same execution plan as if we didn't use a loop in this case. You should still be careful with what you are looping over and what you're trying to accomplish. Examine the execution plans to avoid bottlenecks. Option 3 runs without error in the notebook, but with a large number of values for PAYMENT_TYPES
and input dataframes, I ran into some issues. Originally I wanted to avoid any lower-casing to the dataframe, however, the impact wasn't terrible.
References
https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.create_map.html
https://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html
https://medium.com/datalex/sparks-logical-and-physical-plans-when-why-how-and-beyond-8cd1947b605a
https://medium.com/@david.mudrauskas/looping-over-spark-an-antipattern-e10ac54824a0