Chaining a new RDD with the parent

We first created a multiple RDD class. In the MultipliedRDD class, we have two things that pass the parameters:

  • A brief RDD of the record, that is, RDD[Record]
  • A multiplier, that is, Double

In our case, there could be a chain of multiple RDD's, which means that there could be multiple RDD's inside our RDD. So, this is not always the parent of all the directed acyclic graphs. We are just extending the RDD of the type record and so we need to pass the RDD that is extended.

RDD has a lot of methods and we can override any method we want. However, this time, we are going with the compute method, where we will override the compute method to calculate the multiplier. Here, we get a Partition split and TaskContext. These are passed by this part execution engine to our method, so we don't need to worry about this. However, we need to return the iterator of the exact same type as the type that we pass through the RDD class in the inheritance chain. This will be an iterator of the record.

We then execute the first parent logic, where the first parent is just taking that first RDD in our chain. The type here is Record, and we are taking an iterator of split and context, where the split is just a partition that will be executed. We know that the Spark RDD is partitioned by the partitioner, but, here, we are just getting the specific partition that we need to split. So, the iterator is taking the partition and task context, and so it knows which values should be returned from that iterative method. For every record in that iterator, which is a salesRecord, like amount and description, we are multiplying the amount by the multiplier that was passed to the constructor to get our Double.

By doing this, we have multiplied our amount by the multiplier, and we can then return the new record which has the new amount. So, we now have an amount of the old record multiplied by our multiplier and the description of the salesRecord. For the second filter, what we need to override is getPartitions, as we want to keep the partitioning of the parent RDD. If the previous RDD has 100 partitions, for example, we also want our MultipledRDD to have 100 partitions. So, we want to retain that information about partitions rather than losing it. For the same reason, we are just proxying it to the firstParent. The firstParent of the RDD will then just take the previous partitions from that specific RDD.

In this way, we have created a new multipliedRDD, which passes the parent and multiplier. For our extendedRDD, we need to collect it and call toList, and our list should have 10 and d1, as shown in the following example:

extendedRdd.collect().toList should contain theSameElementsAs List(
Record(10, "d1")
)
}
}
Compute was executed automatically when we created the new RDD, and so it is always executed without the explicit method call.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.108.119