Async methods

There is still a bottleneck in our application; when a user searches ten keywords, each search will be executed sequentially. We could easily improve the speed of our application by using different threads and launching all the searches at the same time.

To enable Spring's asynchronous capabilities, one must use the @EnableAsync annotation. This will transparently execute any method annotated with @Async using a java.util.concurrent.Executor.

It is possible to customize the default executor used by implementing the AsyncConfigurer interface. Let's create a new configuration class called AsyncConfig in the config package:

package masterSpringMvc.config;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

    protected final Log logger = LogFactory.getLog(getClass());

    @Override
    public Executor getAsyncExecutor() {
        return Executors.newFixedThreadPool(10);
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> logger.error("Uncaught async error", ex);
    }
}

With this configuration, we ensure that no more than 10 threads will be allocated to handle our asynchronous tasks in the whole application. This is very important in a web application where each client has a dedicated thread. The more threads you use and the longer they block, the fewer client requests you can process.

Let's annotate our search method and make it asynchronous. We will need to make it return a subtype of Future, a java concurrent class that represents an asynchronous result.

We will create a new implementation of the TwitterSearch class that will query the search API in different threads. The implementation is a bit tricky so I'll break it down into small parts.

First, we need to annotate the method that will query the API with the @Async annotation to tell Spring to schedule the task using our executor. Again, Spring will use proxy to do its magic so this method has to be in a different class to the service calling it. It would also be nice if this component could use our cache. That would lead us to create this component:

@Component
private static class AsyncSearch {
    protected final Log logger = LogFactory.getLog(getClass());
    private SearchCache searchCache;

    @Autowired
    public AsyncSearch(SearchCache searchCache) {
        this.searchCache = searchCache;
    }

    @Async
    public ListenableFuture<List<LightTweet>> asyncFetch(String searchType, String keyword) {
        logger.info(Thread.currentThread().getName() + " - Searching for " + keyword);
        return new AsyncResult<>(searchCache.fetch(searchType, keyword));
    }
}

Don't create this class yet. Let's see what our service needs first.

The ListenableFuture abstraction allows us to add callbacks after the completion of the future, either in the case of correct results or if an exception occurs.

The algorithm to wait for a bunch of asynchronous tasks would look like this:

@Override
public List<LightTweet> search(String searchType, List<String> keywords) {
    CountDownLatch latch = new CountDownLatch(keywords.size());
    List<LightTweet> allTweets = Collections.synchronizedList(new ArrayList<>());
    keywords
            .stream()
            .forEach(keyword -> asyncFetch(latch, allTweets, searchType, keyword));

    await(latch);
    return allTweets;
}

If you don't know the CountDownLatch method, it is just a simple blocking counter.

The await() method will wait until the latch reaches 0 to unlock the thread.

The asyncFetch method, shown in the preceding code, will attach a callback to each of our asynFetch methods. The callback will add the results to the allTweets list and decrement the latch. Once each callback has been called, the method will return all the tweets.

Got it? Here is the final code:

package masterSpringMvc.search;

import masterSpringMvc.search.cache.SearchCache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.social.twitter.api.SearchParameters;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

@Service
@Profile("async")
public class ParallelSearchService implements TwitterSearch {
    private final AsyncSearch asyncSearch;

    @Autowired
    public ParallelSearchService(AsyncSearch asyncSearch) {
        this.asyncSearch = asyncSearch;
    }

    @Override
    public List<LightTweet> search(String searchType, List<String> keywords) {
        CountDownLatch latch = new CountDownLatch(keywords.size());
        List<LightTweet> allTweets = Collections.synchronizedList(new ArrayList<>());

        keywords
                .stream()
                .forEach(keyword -> asyncFetch(latch, allTweets, searchType, keyword));

        await(latch);
        return allTweets;
    }

    private void asyncFetch(CountDownLatch latch, List<LightTweet> allTweets, String searchType, String keyword) {
        asyncSearch.asyncFetch(searchType, keyword)
                .addCallback(
                        tweets -> onSuccess(allTweets, latch, tweets),
                        ex -> onError(latch, ex));
    }

    private void await(CountDownLatch latch) {
        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    private static void onSuccess(List<LightTweet> results, CountDownLatch latch, List<LightTweet> tweets) {
        results.addAll(tweets);
        latch.countDown();
    }

    private static void onError(CountDownLatch latch, Throwable ex) {
        ex.printStackTrace();
        latch.countDown();
    }

    @Component
    private static class AsyncSearch {
        protected final Log logger = LogFactory.getLog(getClass());
        private SearchCache searchCache;

        @Autowired
        public AsyncSearch(SearchCache searchCache) {
            this.searchCache = searchCache;
        }

        @Async
        public ListenableFuture<List<LightTweet>> asyncFetch(String searchType, String keyword) {
            logger.info(Thread.currentThread().getName() + " - Searching for " + keyword);
            return new AsyncResult<>(searchCache.fetch(searchType, keyword));
        }
    }
}

Now, to use this implementation, we need to run the application with the async profile.

We can run it with multiple profiles active at the same time by separating them with commas, as follows:

--spring.profiles.active=redis,async

If we launch a search on multiple terms, we can see something like this:

pool-1-thread-3 - Searching groovy
pool-1-thread-1 - Searching spring
pool-1-thread-2 - Searching java

This shows that the different searches are done in parallel.

Java 8 actually introduced a new type called CompletableFuture, which is a much better API to manipulate futures. The main problem with completable futures is that no executor can work with them without a bit of code. This is outside of the scope of the article, but you can check my blog for an article on the subject: http://geowarin.github.io/spring/2015/06/12/completable-futures-with-spring-async.html.

Note

Disclaimer

The following sections contains a lot of JavaScript. Obviously, I think you should have a look at the code, especially if JavaScript is not your favorite language. It is time to learn it. That being said, even if WebSocket is insanely cool, it is not a requirement. You can safely skip ahead to the last chapter and deploy your application right now.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.5.57