A recommendation system recommends a product or a service to a customer based on the products/services he has bought/used and on the products/services bought/used by the users that have bought/used the same services as him.
We have used the example explained in the previous section to implement a recommendation system. Each description of a product includes the reviews of a number of customers to a product. This review includes the score the customer gives to the product.
In this example, you will use these reviews to get a list of the products that may be interesting to a customer. We will obtain the list of the products purchased by a customer. In order to get that list, a list of the users who have purchased those products and the list of products purchased by those users are sorted using the average score given in the reviews. That will be the suggested products for the user.
We have added two new classes to the ones used in the previous section. These classes are:
ProductReview
: This class extends the product class with two new attributesProductRecommendation
: This class stores the information of the recommendation of a productLet's see the details of both classes.
The ProductReview
class extends the Product
class adding two new attributes:
The class includes the definition of the attributes: the corresponding getXXX()
and setXXX()
methods, a constructor to create a ProductReview
object from a Product
object, and the values for the new attributes. It's very simple, so its source code is not included.
The ProductRecommendation
class stores the necessary information for a product recommendation that includes the following:
title
: The title of the product we are recommendingvalue
: The score of that recommendation, which is calculated as the average score of all the reviews for that productThis class includes the definition of the attributes, the corresponding getXXX()
and setXXX()
methods, and the implementation of the compareTo()
methods (the class implements the Comparable
interface) that will allow us to sort the recommendations in descending order by its value. It's very simple, so its source code is not included.
We have implemented our algorithm in the ConcurrentMainRecommendation
class to obtain the list of recommended products to a customer. This class implements the main()
method that receives as a parameter the ID of the customer whose recommended products we want to obtain. We have the following code:
public static void main(String[] args) { String user = args[0]; Path file = Paths.get("data"); try { Date start, end; start=new Date();
We have used different stream to transform the data in the final solution. The first one loads the whole list of the Product
objects from its files:
List<Product> productList = Files .walk(file, FileVisitOption.FOLLOW_LINKS) .parallel() .filter(f -> f.toString().endsWith(".txt")) .collect(ConcurrentLinkedDeque<Product>::new ,new ConcurrentLoaderAccumulator(), ConcurrentLinkedDeque::addAll);
This stream has the following elements:
walk()
method of the Files
class. This method will create a stream to process all the files and directories under the data directory.parallel()
method to convert the stream into a concurrent one..txt
only.collect()
method to obtain a ConcurrentLinkedDeque
class of the Product
objects. It's very similar to the one used in the previous section with the difference that we use another accumulator object. In this case, we use the ConcurrentLoaderAccumulator
class that we will describe later.Once we have the list of products, we are going to organize those products in a map using the identifier of the customer as the key for that map. We use the ProductReview
class to store the information of the customers of the products. We will create as many ProductReview
objects as reviews have a Product
. We use the following stream to make the transformation:
Map<String, List<ProductReview>> productsByBuyer=productList .parallelStream() .<ProductReview>flatMap(p -> p.getReviews().stream().map(r -> new ProductReview(p, r.getUser(), r.getValue()))) .collect(Collectors.groupingByConcurrent( p -> p.getBuyer()));
This stream has the following elements:
parallelStream()
method of the productList
object, so we create a concurrent stream.flatMap()
method to convert the stream of Product
objects we have into a unique stream of ProductReview
objects.collect()
method to generate the final map. In this case, we have used the predefined collector generated by the groupingByConcurrent()
method of the Collectors
class. The returned collector will generate a map where the keys will be the different values of the buyer attributes and the values of a list of ProductReview
objects with the information of the products purchased by that user. This transformation will be done, as the method name indicates, in a concurrent way.The next stream is the most important stream of this example. We take the products purchased by a customer and generate the recommendations to that customer. It's a two-phase process made by one stream. In the first phase, we obtain the users that purchased the products purchased by the original customer. In the second phase, we generate a map with the products purchased by those customers with all the reviews of the products made by those customers. This is the code for that stream:
Map<String,List<ProductReview>> recommendedProducts=productsByBuyer.get(user) .parallelStream() .map(p -> p.getReviews()) .flatMap(Collection::stream) .map(r -> r.getUser()) .distinct() .map(productsByBuyer::get) .flatMap(Collection::stream) .collect(Collectors.groupingByConcurrent(p -> p.getTitle()));
We have the following elements in that stream:
parallelStream()
method.map()
method.List<Review>
. We convert that stream into a stream of Review
objects. Now we have a stream with all the reviews of the products purchased by the user.String
objects with the names of the users who made the reviews.distinct()
method. Now we have a stream of String
objects with the names of the users who purchased the same products as the original user.map()
method to transform each customer into its list of purchased products.List<ProductReview>
objects. We convert that stream into a stream of ProductReview
objects using the flatMap()
method.collect()
method and the groupingByConcurrent()
collector. The keys of the map will be the title of the product and the values of the list of ProductReview
objects with the reviews made by the customers obtained earlier.To finish our recommendation algorithm, we need one last step. For every product, we want to calculate its average score in the reviews and sort the list in descending order to show in the first place the top-rated products. To make that transformation, we use an additional stream:
List<ProductRecommendation> recommendations = recommendedProducts .entrySet() .parallelStream() .map(entry -> new ProductRecommendation( entry.getKey(), entry.getValue().stream().mapToInt(p -> p.getValue()).average().getAsDouble())) .sorted() .collect(Collectors.toList()); end=new Date(); recommendations. forEach(pr -> System.out.println (pr.getTitle()+": "+pr.getValue())); System.out.println("Execution Time: "+(end.getTime()- start.getTime())); } catch (IOException e) { e.printStackTrace(); } } }
We process the map obtained in the previous step. For each product, we process its list of reviews generating a ProductRecommendation
object. The value of this object is calculated as the average value of each review using a stream using the mapToInt()
method to transform the stream of ProductReview
objects into a stream of integers and the average()
method to get the average value of all the numbers in the string.
Finally, in the recommendations ConcurrentLinkedDeque
class, we have a list of ProductRecommendation
objects. We sort that list using an other stream with the sorted()
method. We use that stream to write the final list in the console.
To implement this example, we have used the ConcurrentLoaderAccumulator
class used as the accumulator function in the collect()
method that transforms the stream of Path
objects with the routes of all the files to process into the ConcurrentLinkedDeque
class of Product
objects. This is the source code of this class:
public class ConcurrentLoaderAccumulator implements BiConsumer<ConcurrentLinkedDeque<Product>, Path> { @Override public void accept(ConcurrentLinkedDeque<Product> list, Path path) { Product product=ProductLoader.load(path); list.add(product); } }
It implements the BiConsumer
interface. The accept()
method uses the ProducLoader
class (explained earlier in this chapter) to load the product information from the file and add the resultant Product
object in the ConcurrentLinkedDeque
class received as parameters.
As with other examples in the book, we have implemented a serial version of this example to check that parallel streams improve the performance of the application. To implement this serial version, we have to follow these steps:
ConcurrentLinkedDeque
data structure by the List
or ArrayList
data structuresparallelStrem()
method by the stream()
methodgropingByConcurrent()
method by the groupingBy()
methodYou can see the serial version of this example in the source code of the book.
To compare the serial and concurrent versions of our recommendation system, we have obtained the recommended products for three users:
A2JOYUS36FLG4Z
A2JW67OY8U6HHK
A2VE83MZF98ITY
For these three users, we have executed both versions using the JMH framework (http://openjdk.java.net/projects/code-tools/jmh/) that allows you to implement micro benchmarks in Java. Using a framework for benchmarking is a better solution that simply measures time using the methods such as currentTimeMillis()
or nanoTime()
. We have executed them 10 times in a computer with a four-core processor and calculated the medium execution time of those 10 times. These are the results in milliseconds:
A2JOYUS36FLG4Z |
A2JW67OY8U6HHK |
A2VE83MZF98ITY | |
---|---|---|---|
Serial |
4848.672 |
4830.051 |
4817.216 |
Concurrent |
2454.003 |
2458.003 |
2527.194 |
We can draw the following conclusions:
If we compare the concurrent and serial versions, for example, the second user using the speed-up, we obtain the following result:
3.16.255.126