How it works...

In this recipe, we implemented an application that uses ConcurrentHashMap to store information about operations made by users. Internally, the hash table uses the user attribute of the Operation class as a key and ConcurrentLinkedDeque (a non-blocking concurrent list) as its value to store all the operations associated with that user.

First, we filled the hash with some random data using 10 different threads. We implemented the HashFiller task for this purpose. The biggest problem with these tasks is what happens when you have to insert a key in the hash table. If two threads want to add the same key at the same time, you can lose the data inserted by one of the threads and have a data-race condition. To solve this problem, we used the computeIfAbsent() method.

This method receives a key and an implementation of the Function interface that can be expressed as a lambda expression; the key and implementation are received as parameters. If the key exists, the method returns the value associated with the key. If it doesn't, the method executes the Function object specified and adds the key and value returned by Function to the HashMap. In our case, the key didn't exist, so we created a new instance of the ConcurrentLinkedDeque class. The main advantage of this method is that it's executed atomically; so, if another thread tries to do the same operation, it will be blocked until this operation is finished.

Then, in the main() method, we used other methods of ConcurrentHashMap to process the information stored in the hash. We used the following methods:

  • forEach(): This method receives an implementation of the BiConsumer interface that can be expressed as a lambda expression; it is received as a parameter. The other two parameters of this expression represent the key and value of the element we're processing. This method applies the expression to all the elements stored in ConcurrentHashMap.
  • forEachEntry(): This method is equivalent to the previous one, but here the expression is an implementation of the Consumer interface. It receives an Entry object that stores the key and value of the entry we're processing as a parameter. This is another way to express the same functionality.
  • search(): This method receives the implementation of the BiFunction interface that can be expressed as a lambda expression; it is received as a parameter. This function also receives the key and value of the entry of the ConcurrentHashMap object we're processing as parameters. It returns the first non-null value returned by BiFunction.
  • reduce(): This method receives two BiFunction interfaces to reduce the elements of ConcurrentHashMap to a unique value. This allows you to implement a MapReduce operation with the elements of ConcurrentHashMap. The first BiFunction interface allows you to transform the key and value of the elements into a unique value, and the second BiFunction interface allows you to aggregate the values of two different elements.

All the methods described so far have a first parameter named parallelismThreshold. This parameter is described as ...the (estimated) number of elements needed for this operation to be executed in parallel..., that is to say, if ConcurrentHashMap has fewer elements than the value specified in the parameter, the method is executed in a sequential way. On the contrary (as in our case), the method is executed in a parallel way.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.106.150