Wednesday, December 26, 2018

Spark JDBC vs Squoop (Use Spark JDBC)

In one of my recent projects at Core Compete , which involved data warehouse modernization and  transitioning the customer's data warehouse from an on-premise data warehouse to cloud, data ingestion was a key component - creating a data lake on HDFS.  The underlying platform is HDP. Various high performance data transforms were developed using pyspark to transform data read from data lake. A custom tool was built to orchestrate incremental and full data loads as described in this page with some variation. ( Prefer Apache Airflow ) . This article focuses on my experience using Spark JDBC to enable data ingestion. This could be used for cloud data warehouse migration.

http://sqoop.apache.org/ is a popular tool used to extract data in bulk from a relational database to HDFS. Sqoop is a wrapper  around JDBC process. Using Sqoop we ran into a few minor issues:

  1. The version we used did not support ORC format 
  2. Timestamps needed further data processing 
  3. Additional step needed to convert  from AVRO to ORC 
While the above issues were no big obstacles, the key issue we had, was having a separate process. Having the data ingest process, more integrated with the data transforms that were developed in Spark, and one that could leverage the data, when in memory, to apply additional transforms like Type 2 SCD would be the most efficient approach. The team at Core Compete designed a reusable framework that downloads data efficiently, in parallel, and also lays down the data on HDFS using a partitioning strategy that helps accelerate the business transforms. Applies a Type 2 SCD, if the table needs one.


It is very important to remember that Spark JDBC /Sqoop will not be comparable in performance to a native database solution like a TPT for Teradata, so those need to be considered and evaluated.

It is very important to understand the different parameters in Spark JDBC, and the meaning of these parameters when using the load function in spark.

A load statement will look like: ( an illustration in pyspark) 

#df1
df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", " ( select * from dm.employee) as emp ")

Note: The dbtable option can also be a sql and not necessarily a table. Ensure you enclose the sql with brackets.

The above statement will run a single connection to the database and extract the data and could be very slow. (very very slow)

So how do we change that ? 

  1. Analyze the table that is being extracted or the data being extracted
  2. Is there a key like employee_id which has a normal distribution , essentially a key which ensures the data is not skewed.
  3. Once the key is identified - identify its upper bound and lower bound ... for example the first employee id is 1 and the max employee id is 100
  4. Set these values to be the upper and lower bounds below
  5. Set the partitionColumn to be the key. (employee_id)
  6. NumParititons -> here identify two things. 
    1. Identifies the number of MAX parallel JDBC connections that are going to be fired
    2. Identifies the number of spark block partitions it is going to write to the HDFS 
  7. Be careful that the database can handle this concurrent connections. check with DBA
  8. Set the upper bound and lower bound based on the partition key range.


#df2
df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", "( select * from dm.employee) as emp ")\ 
.option("password",<password>)\
.option("numPartitions", 20)\
.option("partitionColumn","employee_id")\
.option("lowerBound", 1)\
.option("upperBound", 20000)


Note: The above statement fires 20 concurrent queries to extract data from the employee. Each query will have a clause added to the end

Spark JDBC


Spark internally breaks the query:

select * from ( <sql>) where emp_id >=1 and emp_id <=1000 --> mapper 1
select * from ( <sql>) where emp_id >=1001 and emp_id <=2000 --> mapper 2
and so on...until mapper 20 

Almost there. We might still have a problem ... what happens if the upper bound and lower bound is dynamic ..i.e employee ids are not static. 

How about we identify the min and max values like the following query like  -
 (select max(emp_id ) max_val, min(emp_id) min_val from <sql> ) t , this becomes the value for the "dbtable" option in #df1

bounded_df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", <sql>)

bounded_values = bounded_df.collect()[0]
lower_bound = bounded_values["min_val"]
upper_bound = bounded_values["max_val"]

Use these values in #df2 for upper and lower bounds:


df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", "( select * from dm.employee) as emp ")\ 
.option("password",<password>)\
.option("numPartitions", 20)\
.option("partitionColumn","employee_id")\
.option("lowerBound", lower_bound)\

.option("upperBound", upper_bound)




With the approach described, one can use Spark JDBC as a framework to ingest data.

The entire data ingest process was developed as a component driven by configuration identifying all the meta information (table, partitioning keys, data partition key on HDFS, type 2 SCD). Having a component like this accelerates the creation of data lake!

The target system can be HDFS, S3 or GCS.

Other things to consider as part of data ingest process, which we address for our customers, as reusable components:
  1. Idempotency is a very important attribute, all modules need to adhere to, especially when you are extracting data on HDFS. This needs to be incorporated in the design... what happens when we run an incremental load twice, does the data get corrupted with duplicates? Idempotency design ensures data is not corrupted.
  2. Data validation from source data warehouse to HDFS is needed to ensure data is consistent. Numerical and statistical validation including sampling techniques needs to be built.
  3. Type 2 SCD - In this specific scenario it was a fast changing dimension , so we had to come up with an approach to do this in parallel and efficiently in spark

6 comments: