Delta Lake Tables to SQL Server

Delta Lake Tables to SQL Server

with JDBC Driver

·

3 min read

Recently tasked with shipping data that we've processed and stored in Delta Lake Tables to a SQL Server instance.

The ID

The ID of the destination table is in bigint while the ID within the delta table is varchar(32). To complicate things the ID column in SQL table is not an identity column nor is the value generated from a sequence object. I won't go into detail on how it was generated.

There are other considerations at play, but we will need to generate a sequential ID based off the maximum current value of the the ID column. We want the functionality similar to the monotonically_increasing_id() Pyspark function, but with a starting value. This function will only generate sequential IDs within the specified dataframe. It is not globally unique across dataframes or Spark Sessions.

This function documentation states that the generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.

There is also an impact to performance when calling this function since it has to sequentially generate IDs across all rows of the dataframe across all partitions. In order to achieve sequential generation from a starting seed value, we need to use it in conjunction with the row_count() function.

from pyspark.sql import DataFrame, functions as f

increment: int = 1
# Specify your own config class/dataclass that contains table_name.

# Get the maximum ID from the table
df_max_id = self.jdbc_client \
    .execute_query(f"SELECT MAX({ID}) as Id FROM {config.table_name} WITH(NOLOCK)")

current_id: int = df_max_id.collect()[0][ID]

original_partitions: int = df.rdd.getNumPartitions()
print("Number of partitions for processing executor:", original_partitions)

# Explicilty force single partition processing.
df_single_partition = df.coalesce(1)

# Generates a column of monotonically increasing IDs 
# (64-bit integers or long) based on the default ordering of 
# rows in the DataFrame. This creates a logical sequence 
# that helps define the order of rows within the window.
windowSpec = Window.orderBy(f.monotonically_increasing_id())

# The row_number() assigns a unique sequential integer to each row within 
# the window defined by windowSpec. The ordering of rows is determined 
# by the windowSpec above.
df_with_ids = df_single_partition.withColumn(ID, f.row_number() \
        .over(windowSpec) + current_id + (increment))

# Write the dataframe to SQL Server table.
self.jdbc_client.write(df_with_ids, config.table_name, config.schema, "Append")

The filtered data for processing is fairly small and even without repartitioning may fit in a single partition. The coalesce or repartition functions can be used to explicitly do this. If the dataset is large, then out-of-memory errors could occur depending on you cluster specification.

The JDBC Client

In order to run this locally with the JdbcClient below, you'll need a mssql-jdbc jar which can be downloaded from here . This below configuration for the SparkSession works fine for me locally:

from pyspark.sql import SparkSession

session_builder =  SparkSession.builder.appName(name) \
    .master("local[*]") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .config("spark.cores.max", "8") \
    .config("spark.executor.heartbeatInterval", "3600s") \
    .config("spark.storage.blockManagerSlaveTimeoutMs", "4200s") \
    .config("spark.network.timeout", "4200s") \
    .config("spark.memory.offHeap.enabled", "false") \
    .config("spark.ui.showConsoleProgress", "false") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config('spark.sql.shuffle.partitions', 1) \
    .config('spark.default.parallelism', 1) \
    .config('spark.rdd.compress', "false") \
    .config('spark.shuffle.compress', "false") \
    .config('spark.shuffle.spill.compress', "false") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.memory.offHeap.size", "2g")
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .config("spark.sql.catalog.local", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.extraClassPath", "/path/to/mssql-jdbc-12.6.1.jre11.jar") 

spark_session: SparkSession = configure_spark_with_delta_pip(session_builder).getOrCreate()

The JdbcClient class is a wrapper around the specified JDBC driver. There are several functions that automatically wrap the execution within a transaction, but this can be conditional or additional functions added that forgoes this.

References