How smart data sources work internally

JDBC stands for Java Database Connectivity. When talking about Apache Spark and JDBC there is sometimes a bit of confusion because JDBC can be used in the context of a data source as well as referred to Apache Spark's capability to serve as JDBC-compliant data source to other systems. The latter is not further covered in this book, whereas the former is only used as one particular example where the data source (in this case a relational database) can be transparently used for data pre-processing without the user of Apache SparkSQL further noticing it.

If you want to use Apache SparkSQL as a data source for other JAVA/JVM-based applications you have to start the JDBC Thrift server, as explained here: https://developer.ibm.com/hadoop/2016/08/22/how-to-run-queries-on-spark-sql-using-jdbc-via-thrift-server/. The following website explains how to connect to a MySQL database from Apache SparkSQL using JDBC: https://docs.databricks.com/spark/latest/data-sources/sql-databases.html. And more on JDBC in general can be found here: http://www.oracle.com/technetwork/java/overview-141217.html#2.

Now we want to show how Apache Spark internally implements smart data sources exemplified on a filter pushed down to a JDBC data source. This means that if in a PEP data has to be filtered, then this filter will be executed in the SQL statement on the RDBMS when the underlying data is read. This way, reading unnecessary data is avoided. Depending on the seductiveness of the filter predicate, the performance gain can be multiple orders of magnitude.

Let's have a look at the following trait (here's the link to the source code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala):

trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}

This trait defines a method called buildScan, which takes two parameters. First, a list of column names which have to be included in the result, and second, an array of filter objects basically expressing the query predicate pushed down to the underlying smart data source. The return type is of RDD[Row]. One example is implemented in the JDBCRelation class (again, here is the link to the source code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala):

private[sql] case class JDBCRelation(
parts: Array[Partition], jdbcOptions: JDBCOptions)(@transient val sparkSession: SparkSession)
extends BaseRelation
with PrunedFilteredScan
with InsertableRelation {

JDBCRelation implemented the buildScan method in the following way:

  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
JDBCRDD.scanTable(
sparkSession.sparkContext,
schema,
requiredColumns,
filters,
parts,
jdbcOptions).asInstanceOf[RDD[Row]]
}

As we can see, this method is just a delegate. Therefore, we have to have a look at the scanTable method of the JDBCRDD class. It is interesting to note that, in case of JDBCRDD, support for predicate push-down is implemented directly in the RDD class. But we'll skip the scanTable method for now, since it just parameterizes and creates a new JDBCRDD object. So the most interesting method in JDBCRDD is compute, which it inherits from the abstract RDD class. Through the compute method, Apache Spark tells this RDD to get out of lazy mode and materialize itself whenever it is appropriate during computation of a data processing job. We'll show you two important fractions of this method after we have had a look at the method signature:

override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = {

Here you can see that the return type is of Iterator, which allows a lazy underlying data source to be read lazily as well. As we can see, this is the case for this particular implementation as well:

    val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(options.fetchSize)
rs = stmt.executeQuery()
Note that the SQL statement created and stored in the sqlText constant is referencing two interesting variables: columnList and myWhereClause. Both are derived from the requiredColumns and filter arguments passed to the JDBCRelation class.

Therefore, this data source can be called a smart source, because the underlying storage technology (an SQL database in this case) can be told to only return columns and rows which are actually requested. And as already mentioned, the data source supports passing lazy data access patterns to be pushed to the underlying database as well. Here you can see that the JDBC result set is wrapped into a typed InternalRow iterator, Iterator[InternalRow]]. Since this matches the return type of the compute method, we are done upon execution of the following code:

val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics) CompletionIterator[InternalRow, Iterator[InternalRow]](rowsIterator, close())

Note that ResultSet rs obtained from the JDBC database query is passed to (wrapped into) a delegate object. Therefore, the JDBC connection stays open, and the RDBMS courser doesn't get destroyed and can be used to return subsequent data once requested from Apache Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.227.4