The second example – a recommendation system

A recommendation system recommends a product or a service to a customer based on the products/services he has bought/used and on the products/services bought/used by the users that have bought/used the same services as him.

We have used the example explained in the previous section to implement a recommendation system. Each description of a product includes the reviews of a number of customers to a product. This review includes the score the customer gives to the product.

In this example, you will use these reviews to get a list of the products that may be interesting to a customer. We will obtain the list of the products purchased by a customer. In order to get that list, a list of the users who have purchased those products and the list of products purchased by those users are sorted using the average score given in the reviews. That will be the suggested products for the user.

Common classes

We have added two new classes to the ones used in the previous section. These classes are:

  • ProductReview: This class extends the product class with two new attributes
  • ProductRecommendation: This class stores the information of the recommendation of a product

Let's see the details of both classes.

The ProductReview class

The ProductReview class extends the Product class adding two new attributes:

  • buyer: This attribute stores the name of a customer of the product
  • value: This attribute stores the value given by this customer to the product in his review

The class includes the definition of the attributes: the corresponding getXXX() and setXXX() methods, a constructor to create a ProductReview object from a Product object, and the values for the new attributes. It's very simple, so its source code is not included.

The ProductRecommendation class

The ProductRecommendation class stores the necessary information for a product recommendation that includes the following:

  • title: The title of the product we are recommending
  • value: The score of that recommendation, which is calculated as the average score of all the reviews for that product

This class includes the definition of the attributes, the corresponding getXXX() and setXXX() methods, and the implementation of the compareTo() methods (the class implements the Comparable interface) that will allow us to sort the recommendations in descending order by its value. It's very simple, so its source code is not included.

The recommendation system – the main class

We have implemented our algorithm in the ConcurrentMainRecommendation class to obtain the list of recommended products to a customer. This class implements the main() method that receives as a parameter the ID of the customer whose recommended products we want to obtain. We have the following code:

    public static void main(String[] args) {
        String user = args[0];
        Path file = Paths.get("data");
        try {
            Date start, end;
            start=new Date();

We have used different stream to transform the data in the final solution. The first one loads the whole list of the Product objects from its files:

            List<Product> productList = Files
                .walk(file, FileVisitOption.FOLLOW_LINKS)
                .parallel()
                .filter(f -> f.toString().endsWith(".txt"))
                .collect(ConcurrentLinkedDeque<Product>::new
                 ,new ConcurrentLoaderAccumulator(), ConcurrentLinkedDeque::addAll);

This stream has the following elements:

  • We start the stream with the walk() method of the Files class. This method will create a stream to process all the files and directories under the data directory.
  • Then, we use the parallel() method to convert the stream into a concurrent one.
  • Then, we get the files with the extension .txt only.
  • Finally, we use the collect() method to obtain a ConcurrentLinkedDeque class of the Product objects. It's very similar to the one used in the previous section with the difference that we use another accumulator object. In this case, we use the ConcurrentLoaderAccumulator class that we will describe later.

Once we have the list of products, we are going to organize those products in a map using the identifier of the customer as the key for that map. We use the ProductReview class to store the information of the customers of the products. We will create as many ProductReview objects as reviews have a Product. We use the following stream to make the transformation:

        Map<String, List<ProductReview>> productsByBuyer=productList
                .parallelStream()
                .<ProductReview>flatMap(p -> p.getReviews().stream().map(r -> new ProductReview(p, r.getUser(), r.getValue())))
                .collect(Collectors.groupingByConcurrent( p -> p.getBuyer()));

This stream has the following elements:

  • We start stream with the parallelStream() method of the productList object, so we create a concurrent stream.
  • Then, we use the flatMap() method to convert the stream of Product objects we have into a unique stream of ProductReview objects.
  • Finally, we use the collect() method to generate the final map. In this case, we have used the predefined collector generated by the groupingByConcurrent() method of the Collectors class. The returned collector will generate a map where the keys will be the different values of the buyer attributes and the values of a list of ProductReview objects with the information of the products purchased by that user. This transformation will be done, as the method name indicates, in a concurrent way.

The next stream is the most important stream of this example. We take the products purchased by a customer and generate the recommendations to that customer. It's a two-phase process made by one stream. In the first phase, we obtain the users that purchased the products purchased by the original customer. In the second phase, we generate a map with the products purchased by those customers with all the reviews of the products made by those customers. This is the code for that stream:

            Map<String,List<ProductReview>> recommendedProducts=productsByBuyer.get(user)
                    .parallelStream()
                    .map(p -> p.getReviews())
                    .flatMap(Collection::stream)
                    .map(r -> r.getUser())
                    .distinct()
                    .map(productsByBuyer::get)
                    .flatMap(Collection::stream)
                    .collect(Collectors.groupingByConcurrent(p -> p.getTitle()));

We have the following elements in that stream:

  • First, we get the list of products purchased by the user and generate a concurrent stream using the parallelStream() method.
  • Then, we get all the reviews for that products using the map() method.
  • At this moment, we have a stream of List<Review>. We convert that stream into a stream of Review objects. Now we have a stream with all the reviews of the products purchased by the user.
  • Then, we transform that stream into a stream of String objects with the names of the users who made the reviews.
  • Then, we get the unique names of the users with the distinct() method. Now we have a stream of String objects with the names of the users who purchased the same products as the original user.
  • Then, we use the map() method to transform each customer into its list of purchased products.
  • At this moment, we have a stream of List<ProductReview> objects. We convert that stream into a stream of ProductReview objects using the flatMap() method.
  • Finally, we generate a map of products using the collect() method and the groupingByConcurrent() collector. The keys of the map will be the title of the product and the values of the list of ProductReview objects with the reviews made by the customers obtained earlier.

To finish our recommendation algorithm, we need one last step. For every product, we want to calculate its average score in the reviews and sort the list in descending order to show in the first place the top-rated products. To make that transformation, we use an additional stream:

        List<ProductRecommendation> recommendations = recommendedProducts
                    .entrySet()
                    .parallelStream()
                    .map(entry -> new
                     ProductRecommendation(
                         entry.getKey(),
                         entry.getValue().stream().mapToInt(p -> p.getValue()).average().getAsDouble()))
                    .sorted()
                    .collect(Collectors.toList());
            end=new Date();
         recommendations. forEach(pr -> System.out.println (pr.getTitle()+": "+pr.getValue()));

            System.out.println("Execution Time: "+(end.getTime()- start.getTime()));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

We process the map obtained in the previous step. For each product, we process its list of reviews generating a ProductRecommendation object. The value of this object is calculated as the average value of each review using a stream using the mapToInt() method to transform the stream of ProductReview objects into a stream of integers and the average() method to get the average value of all the numbers in the string.

Finally, in the recommendations ConcurrentLinkedDeque class, we have a list of ProductRecommendation objects. We sort that list using an other stream with the sorted() method. We use that stream to write the final list in the console.

The ConcurrentLoaderAccumulator class

To implement this example, we have used the ConcurrentLoaderAccumulator class used as the accumulator function in the collect() method that transforms the stream of Path objects with the routes of all the files to process into the ConcurrentLinkedDeque class of Product objects. This is the source code of this class:

public class ConcurrentLoaderAccumulator implements
        BiConsumer<ConcurrentLinkedDeque<Product>, Path> {

    @Override
    public void accept(ConcurrentLinkedDeque<Product> list, Path path) {

        Product product=ProductLoader.load(path);
        list.add(product);
        
    }
}

It implements the BiConsumer interface. The accept() method uses the ProducLoader class (explained earlier in this chapter) to load the product information from the file and add the resultant Product object in the ConcurrentLinkedDeque class received as parameters.

The serial version

As with other examples in the book, we have implemented a serial version of this example to check that parallel streams improve the performance of the application. To implement this serial version, we have to follow these steps:

  • Replace the ConcurrentLinkedDeque data structure by the List or ArrayList data structures
  • Change the parallelStrem() method by the stream() method
  • Change the gropingByConcurrent() method by the groupingBy() method

You can see the serial version of this example in the source code of the book.

Comparing the two versions

To compare the serial and concurrent versions of our recommendation system, we have obtained the recommended products for three users:

  • A2JOYUS36FLG4Z
  • A2JW67OY8U6HHK
  • A2VE83MZF98ITY

For these three users, we have executed both versions using the JMH framework (http://openjdk.java.net/projects/code-tools/jmh/) that allows you to implement micro benchmarks in Java. Using a framework for benchmarking is a better solution that simply measures time using the methods such as currentTimeMillis() or nanoTime(). We have executed them 10 times in a computer with a four-core processor and calculated the medium execution time of those 10 times. These are the results in milliseconds:

 

A2JOYUS36FLG4Z

A2JW67OY8U6HHK

A2VE83MZF98ITY

Serial

4848.672

4830.051

4817.216

Concurrent

2454.003

2458.003

2527.194

We can draw the following conclusions:

  • The results obtained are very similar for the three users
  • The execution time of concurrent streams is always better than the execution time of the sequential ones

If we compare the concurrent and serial versions, for example, the second user using the speed-up, we obtain the following result:

Comparing the two versions
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.255.126