Chapter 8. Testing

Writing effective software without tests is quite challenging. Effective testing, especially in cases with slow end-to-end running times, such as distributed systems, can help improve developer effectiveness greatly. However, this chapter isn't going to try and convince you that you should be testing; if you really want to ride without a seat belt, that's fine too.

Testing in Java and Scala

For the sake of simplicity, this chapter will look at using ScalaTest and JUnit as the testing libraries. ScalaTest can be used to test both Scala and Java code and is the testing library currently used in Spark. JUnit is a popular testing framework for Java.

Refactoring your code for testability

If you have code that can be isolated from the RDD interaction or SparkContext interaction, this code can be tested using standard methodologies. While it can be quite convenient to use anonymous functions when writing Spark code, by giving them names, you can test them more easily without having to deal with the expensive overhead of setting up SparkContext. For example, in your Scala CSV parser, you could had this hard to test code:

  val splitLines = inFile.map(line => {
      val reader = new CSVReader(new StringReader(line))
      reader.readNext().map(_.toDouble)
  }

Or in Java you had:

JavaRDD<Integer[]> splitLines = inFile.flatMap(new FlatMapFunction<String, Integer[]> (){
  public Iterable<Integer[]> call(String line) {
    ArrayList<Integer[]> result = new ArrayList<Integer[]>();
    try {
    CSVReader reader = new CSVReader(new StringReader(line));
    String[] parsedLine = reader.readNext();
    Integer[] intLine = new Integer[parsedLine.length];
    for (int i = 0; i < parsedLine.length; i++) {
      intLine[i] = Integer.parseInt(parsedLine[i]);
    result.add(intLine);
    }
    catch (Exception e) {
      errors.add(1);
    }
    return result;
    }
  }
);

Instead in Scala, you could write this in the CSV parser as a separate function:

def parseLine(line: String): Array[Double] = {
    val reader = new CSVReader(new StringReader(line))
    reader.readNext().map(_.toDouble)}

While, in Java, you could add this:

public class JavaLoadCsvTestable {
    public static class ParseLine extends Function<String, Integer[]> {
      public Integer[] call(String line) throws Exception {
       CSVReader reader = new CSVReader(new StringReader(line));
       String[] parsedLine = reader.readNext();
       Integer[] intLine = new Integer[parsedLine.length];
       for (int i = 0; i < parsedLine.length; i++) {
         intLine[i] = Integer.parseInt(parsedLine[i]);
       }
      return intLine;
      }
    }

You can then test the Java code, without having to worry about any Spark-specific setup or logic:

package pandaspark.examples

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers

class TestableLoadCsvExampleSuite extends FunSuite with ShouldMatchers {
    test("should parse a csv line with numbers") {
      TestableLoadCsvExample.parseLine("1,2") should equal(Array[Double](1.0,2.0))
      TestableLoadCsvExample.parseLine("100,-1,1,2,2.5")should equal (Array[Double](100,-1,1.0,2.0,2.5))
    }
    test("should error if there is a non-number") {
      evaluating {
        TestableLoadCsvExample.parseLine("pandas")
      } should produce [NumberFormatException]
    }
}

Or, to test the Java code, you could write something like:

class JavaLoadCsvExampleSuite extends FunSuite with ShouldMatchers {

    test("should parse a csv line with numbers") {
      val parseLine = new JavaLoadCsvTestable.ParseLine();
      parseLine.call("1,2") should equal (Array[Integer](1,2))
      parseLine.call("100,-1,1,2,2") should equal (Array[Integer](100,-1,1,2,2))
    }
    test("should error if there is a non-integer") {
      val parseLine = new JavaLoadCsvTestable.ParseLine();
      evaluating { parseLine.call("pandas")  } should produce [NumberFormatException]
      evaluating {parseLine.call("100,-1,1,2.2,2") should equal (Array[Integer](100,-1,1,2,2)) } should produce [NumberFormatException]
    }
}

Note that the test is still written in Scala; don't worry, we will look at JUnit tests later.

Testing interactions with SparkContext

However, you may remember that you later extended our CSV parser to increment counters on invalid input so as to gracefully handle failures. To verify that behavior, you could provide mock counters and other mock objects for the Spark components you are using. You are restricted to only testing the parts of our code that don't depend on Spark. Instead, you could re-factor our code to have the core be testable without Spark as well as do a more complete test using a provided SparkContext as illustrated in the following example:

Tip

This does have the significant downside of requiring that your tests run serially as, otherwise, sbt (or another build infrastructure) may try and launch multiple SparkContext at the same time, which will cause confusing error messages. We can force tests to execute sequentially in sbt with parallelExecution in Test := false.

object MoreTestableLoadCsvExample {
  def parseLine(line: String): Array[Double] = {
    val reader = new CSVReader(new StringReader(line))
    reader.readNext().map(_.toDouble)
  }
  def handleInput(invalidLineCounter: Accumulator[Int],inFile: RDD[String]): RDD[Double] = {
    val numericData = inFile.flatMap(line => {
      try {
        Some(parseLine(line))
      }
      catch {
        case _ => {
        invalidLineCounter += 1
        None
        }
      }
    })
    numericData.map(row => row.sum)
  }

  def main(args: Array[String]) {
    if (args.length != 2) {
      System.err.println("Usage: TestableLoadCsvExample<master> <inputfile>")
      System.exit(1)
    }
    val master = args(0)
    val inputFile = args(1)
    val sc = new SparkContext(master, "Load CSV Example",System.getenv("SPARK_HOME"),Seq(System.getenv("JARS")))
    sc.addFile(inputFile)
    val inFile = sc.textFile(inputFile)
    val invalidLineCounter = sc.accumulator(0)
    val summedData = handleInput(invalidLineCounter, inFile)
    println(summedData.collect().mkString(","))
    println("Errors: "+invalidLineCounter)
    println(summedData.stats())
  }

}

We test this with the following code:

import spark._
import spark.SparkContext._
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers

class MoreTestableLoadCsvExampleSuite extends FunSuite with ShouldMatchers {
  test("summ data on input") {
    val sc = new SparkContext("local", "Load CSV Example")
    val counter = sc.accumulator(0)
    val input = sc.parallelize(List("1,2","1,3"))
    val result = MoreTestableLoadCsvExample.handleInput(counter,input)
    result.collect() should equal (Array[Int](3,4))
  }
  test("should parse a csv line with numbers") {
    MoreTestableLoadCsvExample.parseLine("1,2") should equal(Array[Double](1.0,2.0))
    MoreTestableLoadCsvExample.parseLine("100,-1,1,2,2.5")should equal (Array[Double](100,-1,1.0,2.0,2.5))
  }
  test("should error if there is a non-number") {
    evaluating { MoreTestableLoadCsvExample.parseLine("pandas") }should produce [NumberFormatException]
  }
}

And in Java we use:

public class JavaLoadCsvMoreTestable {
    public static class ParseLineWithAcc extends FlatMapFunction<String, Integer[]> {
    Accumulator<Integer> acc;
    ParseLineWithAcc(Accumulator<Integer> acc) {
      this.acc = acc;
    }
    public Iterable<Integer[]> call(String line) throws Exception{
      ArrayList<Integer[]> result = new ArrayList<Integer[]>();
      try {
        CSVReader reader = new CSVReader(new StringReader(line));
        String[] parsedLine = reader.readNext();
        Integer[] intLine = new Integer[parsedLine.length];
        for (int i = 0; i < parsedLine.length; i++) {
          intLine[i] = Integer.parseInt(parsedLine[i]);
        }
        result.add(intLine);
      }
      catch (Exception e) {
        acc.add(1);
      }
      return result;
    }
  }
    public static JavaDoubleRDD processData(Accumulator<Integer> acc, JavaRDD<String> input) {
      JavaRDD<Integer[]> splitLines = input.flatMap(new ParseLineWithAcc(acc));
      JavaDoubleRDD summedData = splitLines.map(new DoubleFunction<Integer[]>() {
        public Double call(Integer[] in) {
          Double ret = 0.;
          for (int i = 0; i < in.length; i++) {
            ret += in[i];
          }
        return ret;
        }
      }
      );
    return summedData;
}

You can test this in Scala with:

class JavaLoadCsvMoreTestableSuite extends FunSuitewith ShouldMatchers {
  test("sum data on input") {
    val sc = new JavaSparkContext("local", "Load Java CSV test")
    val counter: Accumulator[Integer] = sc.intAccumulator(0)
    val input: JavaRDD[String] = sc.parallelize(List("1,2","1,3","murh"))
    val javaLoadCsvMoreTestable = new JavaLoadCsvMoreTestable();
    val resultRDD = JavaLoadCsvMoreTestable.processData(counter,input)
    resultRDD.cache();
    val resultCount = resultRDD.count()
    val result = resultRDD.collect().toArray()
    resultCount should equal (2)
    result should equal (Array[Double](3.0, 4.0))
    counter.value should equal (1)
    sc.stop()
  }
}

Note that we add an invalid input for the counter.

In Java, using JUnit4 you can add the following code for testing:

package pandaspark.examples;

import spark.*;
import spark.api.java.JavaSparkContext;
import spark.api.java.JavaRDD;
import spark.api.java.JavaDoubleRDD;
import org.scalatest.FunSuite;
import org.scalatest.matchers.ShouldMatchers;

import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
@RunWith(JUnit4.class)
public class JavaLoadCsvMoreTestableSuiteJunit {
    @Test
    public void testSumDataOnInput() {
      JavaSparkContext sc = new JavaSparkContext("local","Load Java CSV test");
      Accumulator<Integer> counter = sc.intAccumulator(0);
      String[] inputArray = {"1,2","1,3","murh"};
      JavaRDD<String> input = sc.parallelize(Arrays.asList(inputArray));
      JavaDoubleRDD resultRDD = JavaLoadCsvMoreTestable.processData(counter, input);
      long resultCount = resultRDD.count();
      assertEquals(resultCount, 2);
      int errors = counter.value();
      assertEquals(errors, 1);
      sc.stop();
    }
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.242.235