Chapter 3. Adopting Streams

In this chapter, you’ll learn how to adopt the Streams API. First, you’ll gain an understanding behind the motivation for the Streams API, and then you’ll learn exactly what a stream is and what it’s used for. Next, you’ll learn about various operations and data processing patterns using the Streams API, and about Collectors, which let you write more sophisticated queries. You’ll then look at a practical refactoring example. Finally, you’ll learn about parallel streams.

The Need for Streams

The Collections API is one of the most important parts of the Java API. Nearly every Java application makes and processes collections. But despite its importance, the processing of collections in Java is still unsatisfactory in many aspects.

For one reason, many alternative programming languages or libraries let you express typical data processing patterns in a declarative way. Think of SQL, where you can select from a table, filter values given a condition, and also group elements in some form. There’s no need to detail how to implement the query—the database figures it out for you. The benefit is that your code is easier to understand. Unfortunately, in Java you don’t get this. You have to implement the low-level details of a data processing query using control flow constructs.

Second, how can you process really large collections efficiently? Ideally, to speed up the processing, you want to leverage multicore architectures. However, writing parallel code is hard and error-prone.

The Streams API addresses both these issues. It introduces a new abstraction called Stream that lets you process data in a declarative way. Furthermore, streams can leverage multicore architectures without you having to deal with low-level constructs such as threads, locks, conditional variables, and volatiles, etc.

For example, say you need to filter a list of invoices to find those related to a specific customer, sort them by amount of the invoice, and then extract their IDs. Using the Streams API, you can express this simply with the following query:

List<Integer> ids
    = invoices.stream()
              .filter(inv ->
                      inv.getCustomer() == Customer.ORACLE)
              .sorted(comparingDouble(Invoice::getAmount))
              .map(Invoice::getId)
              .collect(Collectors.toList());

You’ll see how this code works in more detail later in this chapter.

What Is a Stream?

So what is a stream? Informally, you can think of it as a “fancy iterator” that supports database-like operations. Technically, it’s a sequence of elements from a source that supports aggregate operations. Here’s a breakdown of the more formal definition:

Sequence of elements

A stream provides an interface to a sequenced set of values of a specific element type. However, streams don’t actually store elements; they’re computed on demand.

Source

Streams consume from a data-providing source such as collections, arrays, or I/O resources.

Aggregate operations

Streams support database-like operations and common operations from functional programming languages, such as filter, map, reduce, findFirst, allMatch, sorted, and so on.

Furthermore, stream operations have two additional fundamental characteristics that differentiate them from collections:

Pipelining

Many stream operations return a stream themselves. This allows operations to be chained to form a larger pipeline. This style enables certain optimizations such as laziness, short-circuiting, and loop fusion.

Internal iteration

In contrast to collections, which are iterated explicitly (external iteration), stream operations do the iteration behind the scenes for you.

Stream Operations

The Stream interface in java.util.stream.Stream defines many operations, which can be grouped into two categories:

  • Operations such as filter, sorted, and map, which can be connected together to form a pipeline

  • Operations such as collect, findFirst, and allMatch, which terminate the pipeline and return a result

Stream operations that can be connected are called intermediate operations. They can be connected together because their return type is a Stream. Intermediate operations are “lazy” and can often be optimized. Operations that terminate a stream pipeline are called terminal operations. They produce a result from a pipeline such as a List, Integer, or even void (i.e., any nonstream type).

Let’s take a tour of some of the operations available on streams. Refer to the java.util.stream.Stream interface for the complete list.

Filtering

There are several operations that can be used to filter elements from a stream:

filter

Takes a Predicate object as an argument and returns a stream including all elements that match the predicate

distinct

Returns a stream with unique elements (according to the implementation of equals for a stream element)

limit

Returns a stream that is no longer than a certain size

skip

Returns a stream with the first n number of elements discarded

List<Invoice> expensiveInvoices
    = invoices.stream()
              .filter(inv -> inv.getAmount() > 10_000)
              .limit(5)
              .collect(Collectors.toList());

Matching

A common data processing pattern is determining whether some elements match a given property. You can use the anyMatch, allMatch, and noneMatch operations to help you do this. They all take a predicate as an argument and return a boolean as the result. For example, you can use allMatch to check that all elements in a stream of invoices have a value higher than 1,000:

boolean expensive =
    invoices.stream()
            .allMatch(inv -> inv.getAmount() > 1_000);

Finding

In addition, the Stream interface provides the operations findFirst and findAny for retrieving arbitrary elements from a stream. They can be used in conjunction with other stream operations such as filter. Both findFirst and findAny return an Optional object (which we discussed in Chapter 1):

Optional<Invoice> =
    invoices.stream()
            .filter(inv ->
                    inv.getCustomer() == Customer.ORACLE)
            .findAny();

Mapping

Streams support the method map, which takes a Function object as an argument to turn the elements of a stream into another type. The function is applied to each element, “mapping” it into a new element.

For example, you might want to use it to extract information from each element of a stream. This code returns a list of the IDs from a list of invoices:

List<Integer> ids
    = invoices.stream()
              .map(Invoice::getId)
              .collect(Collectors.toList());

Reducing

Another common pattern is that of combining elements from a source to provide a single value. For example, “calculate the invoice with the highest amount” or “calculate the sum of all invoices’ amounts.” This is possible using the reduce operation on streams, which repeatedly applies an operation to each element until a result is produced.

As an example of a reduce pattern, it helps to first look at how you could calculate the sum of a list using a for loop:

int sum = 0;
for (int x : numbers) {
    sum += x;
}

Each element of the list of numbers is combined iteratively using the addition operator to produce a result, essentially reducing the list of numbers into one number. There are two parameters in this code: the initial value of the sum variable—in this case 0—and the operation for combining all the elements of the list, in this case the addition operation.

Using the reduce method on streams, you can sum all the elements of a stream as shown here:

int sum = numbers.stream().reduce(0, (a, b) -> a + b);

The reduce method takes two arguments:

  • An initial value; here, 0.

  • A BinaryOperator<T> to combine two elements and produce a new value. The reduce method essentially abstracts the pattern of repeated application. Other queries such as “calculate the product” or “calculate the maximum” become special-use cases of the reduce method, like so:

int product = numbers.stream().reduce(1, (a, b) -> a * b);
int max = numbers.stream().reduce(Integer.MIN_VALUE,
  Integer::max);

Collectors

The operations you have seen so far were either returning another stream (i.e., intermediate operations) or returning a value, such as a boolean, an int, or an Optional object (i.e., terminal operations). By contrast, the collect method is a terminal operation. It lets you accumulate the elements of a stream into a summary result.

The argument passed to collect is an object of type java.util.stream.Collector. A Collector object essentially describes a recipe for accumulating the elements of a stream into a final result. The factory method Collectors.toList() used earlier returns a Collector object describing how to accumulate a stream into a List. However, there are many similar built-in collectors available, which you can see in the class Collectors. For example, you can group invoices by customers using Collectors.groupingBy as shown here:

Map<Customer, List<Invoice>> customerToInvoices
    = invoices.stream().collect(Collectors.groupingBy(Invoice::getCustomer));

Putting It All Together

Here’s a step-by-step example so you can practice refactoring old-style Java code to use the Streams API. The following code filters invoices that are from a specific customer and related to training, sorts the resulting invoices by amount, and finally extracts the first five IDs:

List<Invoice> oracleAndTrainingInvoices = new ArrayList<>();
List<Integer> ids = new ArrayList<>();
List<Integer> firstFiveIds = new ArrayList<>();

for(Invoice inv: invoices) {
    if(inv.getCustomer() == Customer.ORACLE) {
        if(inv.getTitle().contains("Training")) {
            oracleAndTrainingInvoices.add(inv);
        }
    }
}

Collections.sort(oracleAndTrainingInvoices,
  new Comparator<Invoice>() {
    @Override
    public int compare(Invoice inv1, Invoice inv2) {
        return Double.compare(inv1.getAmount(), inv2.getAmount());
    }
});

for(Invoice inv: oracleAndTrainingInvoices) {
    ids.add(inv.getId());
}

for(int i = 0; i < 5; i++) {
    firstFiveIds.add(ids.get(i));
}

Now you’ll refactor this code step-by-step using the Streams API. First, you may notice that you are using an intermediate container to store invoices that have the customer Customer.ORACLE and "Training" in the title. This is the use case for using the filter operation:

Stream<Invoice> oracleAndTrainingInvoices
    = invoices.stream()
              .filter(inv ->
                      inv.getCustomer() == Customer.ORACLE)
              .filter(inv ->
                      inv.getTitle().contains("Training"));

Next, you need to sort the invoices by their amount. You can use the new utility method Comparator.comparing together with the method sorted, as shown in the previous chapter:

Stream<Invoice> sortedInvoices
    = oracleAndTrainingInvoices.sorted(comparingDouble(Invoice::getAmount));

Next, you need to extract the IDs. This is a pattern for the map operation:

Stream<Integer> ids
    = sortedInvoices.map(Invoice::getId);

Finally, you’re only interested in the first five invoices. You can use the operation limit to stop after those five. Once you tidy up the code and use the collect operation, the final code is as follows:

List<Integer> firstFiveIds
    = invoices.stream()
              .filter(inv ->
                      inv.getCustomer() == Customer.ORACLE)
              .filter(inv ->
                      inv.getTitle().contains("Training"))
              .sorted(comparingDouble(Invoice::getAmount))
              .map(Invoice::getId)
              .limit(5)
              .collect(Collectors.toList());

You can observe that in the old-style Java code, each local variable was stored once and used once by the next stage. Using the Streams API, these throwaway local variables are eliminated.

Parallel Streams

The Streams API supports easy data parallelism. In other words, you can explicitly ask for a stream pipeline to be performed in parallel without thinking about low-level implementation details. Behind the scenes, the Streams API will use the Fork/Join framework, which will leverage the multiple cores of your machine.

All you need to do is exchange stream() with parallelStream(). For example, here’s how to filter expensive invoices in parallel:

List<Invoice> expensiveInvoices
    = invoices.parallelStream()
              .filter(inv -> inv.getAmount() > 10_000)
              .collect(Collectors.toList());

Alternatively, you can convert an existing Stream into a parallel Stream by using the parallel method:

Stream<Invoice> expensiveInvoices
    = invoices.stream()
              .filter(inv -> inv.getAmount() > 10_000);
List<Invoice> result
    = expensiveInvoices.parallel()
                       .collect(Collectors.toList());

Nonetheless, it’s not always a good idea to use parallel streams. There are several factors you need to take into consideration to manage performance benefits:

Splittability

The internal implementation of parallel streams relies on how simple it is to split the source data structure so different threads can work on different parts. Data structures such as arrays are easily splittable, but other data structures such as LinkedList or files offer poor splittability.

Cost per element

The more expensive it is to calculate an element of the stream, the more benefit from parallelism you can get.

Boxing

It is preferable to use primitives instead of objects if possible, as they have lower memory footprint and better cache locality.

Size

A larger number of data elements can produce better results because the parallel setup cost will be amortized over the processing of many elements, and the parallel speedup will outweigh the setup cost. This also depends on the processing cost per element, just mentioned.

Number of cores

Typically, the more cores available, the more parallelism you can get.

In practice, I advise that you benchmark and profile your code if you want a performance improvement. Java Microbenchmark Harness (JMH) is a popular framework maintained by Oracle that can help you with that. Without care, you could get poorer performance by simply switching to parallel streams.

Summary

Here are the most important takeaways from this chapter:

  • A stream is a sequence of elements from a source that supports aggregate operations.

  • There are two types of stream operations: intermediate and terminal operations.

  • Intermediate operations can be connected together to form a pipeline.

  • Intermediate operations include filter, map, distinct, and sorted.

  • Terminal operations process a stream pipeline to return a result.

  • Terminal operations include allMatch, collect, and forEach.

  • Collectors are recipes to accumulate the element of a stream into a summary result, including containers such as List and Map.

  • A stream pipeline can be executed in parallel.

  • There are various factors to consider when using parallel streams for enhanced performance, including splittability, cost per element, packing, data size, and number of cores available.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.222.168.163