How to do it...

There are other Stream operations that can be useful when it comes to data transformation, conversion, manipulation, and augmentation. To illustrate how to use these deterministic operations, perform the following steps:

  1. Let us create a service class EmployeeTransformDataStream that will be applying these kind of Stream operations:
public interface EmployeeTransformDataStream { 
   public Flux<String> mergeWithNames(List<String> others); 
   public Flux<String> concatWithNames(List<String>  
      others); 
   public Flux<Tuple2<String,String>>  
      zipWithNames(List<String> others); 
   public Flux<String> flatMapWithNames(List<String>  
      others); 
   public Mono<Integer> countEmpRecReduce(); 
   public Flux<GroupedFlux<String, String>> groupNames(); 
   public Flux<String> chooseEmission(List<String> others); 
   public String blockedStreamData(); 
   public Iterable<String> iterableData(); 
}
  1. The first method to implement carries two Flux<T> that need to be combined whenever the events of the first Flux<T> must be applied to it while the events of the second applies to both of them. Add the following service implementation with the use of the concatWith() operation:
@Service 
public class EmployeeTransformDataStreamImpl  
implements EmployeeTransformDataStream { 
    
@Autowired 
private EmployeeDao employeeDaoImpl; 
    
@Override 
public Flux<String> concatWithNames(List<String> others) { 
   Function<Employee, String> names =  
(emp) -> emp.getFirstName() + "---validated"; 
   Function<Employee, Mono<String>> flatMapName =  
(emp) -> Mono.just(emp).map(names); 
   Flux<String> concatNames =   
      Flux.fromIterable(employeeDaoImpl.getEmployees()) 
          .flatMap(flatMapName) 
             .concatWith(Flux.fromIterable(others)) 
                .map(String::toUpperCase) 
                .distinct() 
                .sort((s1, s2) -> s1.compareTo(s2)); 
        
   return concatNames; 
} 
} 
  1. To add transformation, the method concatWithNames() also includes distinct() that allows no duplicate entries and sort() which uses Comparator<T> interface.
  2. Next, implement mergeWithNames() that uses mergeWith() to combine two Flux<T> given that the operations are only applied to their respective Flux<T> stream. The method combines the results of the last two events:
@Override 
public Flux<String> mergeWithNames(List<String> others) { 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName() + "---validated"; 
      Function<Employee, Mono<String>> flatMapName =  
(emp) -> Mono.just(emp).map(names); 
      Flux<String> mergedNames =   
            Flux.fromIterable(employeeDaoImpl 
.getEmployees()) 
               .flatMap(flatMapName) 
                 .mergeWith(Flux.fromIterable(others) 
                        .map(String::toUpperCase) 
                     .sort((s1, s2) -> s1.compareTo(s2))); 
      return mergedNames; 
} 
  1. The method zipWithNames() combines two Flux<T> and creates a reactor.util.function.Tuple2. A tuple is a finite ordered list of two or more values. In this method, we create a tuple of 2 using a zipWith() method:
@Override 
public Flux<Tuple2<String,String>>  
zipWithNames(List<String> others) { 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName() + "---validated"; 
      Function<Employee, Mono<String>> flatMapName =  
(emp) -> Mono.just(emp).map(names); 
      Flux<Tuple2<String,String>> zipNames =   
         Flux.fromIterable(employeeDaoImpl.getEmployees()) 
             .flatMap(flatMapName) 
             .zipWith(Flux.fromIterable(others)); 
      return zipNames; 
} 
  1. To retrieve the values from Tuple2, we have the following test TestEmployeeTransformDataStream class as follows:
@RunWith(SpringJUnit4ClassRunner.class) 
@WebAppConfiguration 
@ContextConfiguration(classes = { SpringDbConfig 
      .class, SpringDispatcherConfig.class }) 
public class TestEmployeeTransformDataStream { 
    
   @Autowired 
private EmployeeTransformDataStream  
      employeeTransformDataStreamImpl; 
 
@Test 
public void testZipWith(){ 
      List<String> names = Arrays.asList("John", "Johnwin",  
         "Jolina", "Owin"); 
      employeeTransformDataStreamImpl 
.zipWithNames(names).subscribe((tuple) -> { 
System.out.println(tuple.getT1() + "-" +  
      tuple.getT2()); 
      }); 
} 
}
  1. Aside from tuples, another operation can transform the Stream of elements into groups based on an input key. The flux generated by groupBy() is a flux of GroupedFlux<T, T> which can contain a varying number of Stream elements depending on what is common among them. The following method groupNames() shows how to group together Streams based on their initial character as key:
@Override 
public Flux<GroupedFlux<String, String>> groupNames() { 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName().toLowerCase(); 
      Flux<GroupedFlux<String, String>> grpsNames =  
         Flux.fromIterable(employeeDaoImpl.getEmployees()) 
            .map(names) 
            .groupBy(key -> key.charAt(0)+""); 
      return grpsNames; 
} 
  1. To test this method, add another test case to TestEmployeeTransformDataStream that will show how to retrieve each GroupedFlux<String, String> and will show their keys:
@Test 
public void testGroupBy(){ 
   List<String> names =  
Arrays.asList("John", "Johnwin", "Jolina", "Owin"); 
   employeeTransformDataStreamImpl.groupNames() 
.subscribe((grp) ->{ 
             grp.collectList().subscribe((list)->{ 
               System.out.println("Key: " + grp.key() + " " 
                  + list); 
             }); 
      }); 
} 
  1. The method countEmpRecReduce() computes the sum of all age data using the reduce() method. The same method is used to compute other aggregate results such as finding the minimum, maximum, and average values. This method always converts the Stream result to the Mono<T> stream:
@Override 
public Mono<Integer> countEmpRecReduce() { 
      Function<Employee, Integer> ages =  
(emp) -> emp.getAge(); 
      Function<Employee, Mono<Integer>> flatMapAge =  
(emp) -> Mono.just(emp).map(ages); 
      Mono<Integer> count =  
         Flux.fromIterable(employeeDaoImpl.getEmployees()) 
            .flatMap(flatMapAge) 
            .reduce((total, increment) -> total +  
               increment); 
      return count; 
} 
  1. If the requirement asks for a conversion of Streams to Object or Iterable<T>, operations such as block(), blockLast(), blockFirst(), and iterable() methods are used. These are synchronous operations that are sometimes not used because of their blocking capability. Following are two methods that convert Streams to String and Iterable<String>:
@Override 
public String blockedStreamData() { 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName(); 
         String blockStringVal =              
      Flux.fromIterable(employeeDaoImpl 
.getEmployees()) 
                  .map(names).blockFirst(); 
      return blockStringVal; 
   } 
 
   @Override 
   public Iterable<String> iterableData() { 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName(); 
         Iterable<String> namesIterate =  
      Flux.fromIterable(employeeDaoImpl 
.getEmployees()) 
               .map(names).toIterable(); 
      return namesIterate; 
} 
  1. When implementing gateways of Streams wherein the first Stream to emit data will be the final stream, firstEmitting() is used. The following recipe determines which among the three Flux<T> will emit the first data Streams:
@Override 
public Flux<String> chooseEmission(List<String> others) { 
   Function<Employee, String> names =  
(emp) -> emp.getFirstName(); 
   Flux<String> sideA = Flux.fromIterable(others) 
         .delayElements( 
Duration.ofMillis(200)); 
    Flux<String> sideB =  
      Flux.fromIterable(employeeDaoImpl.getEmployees()) 
          .map(names) 
          .delayElements(Duration.ofMillis(300)); 
    Flux<String> sideC = Flux.fromIterable(others) 
                             .take(2);  
    Flux<String> chosen = Flux.firstEmitting(sideA,  
sideB, sideC); 
 return chosen; 
} 
  1. Lastly, an operator widely used, especially in this chapter is flaptMap() which is capable of running multiple Publisher<T> asynchronously processing transactions and later merging them into an interleaved sequence of Stream data results that can be subscribed in sequential or parallel mode. The following is a method that will illustrate how flatMap() provides data transformation during some computations with the help of repeat() and delayElements():
@Override 
public Flux<String> flatMapWithNames(List<String> others) { 
      Flux<String> flatMaps = Flux.fromIterable(others) 
          .flatMap((str) ->{ 
return Mono.just(str).repeat(3) 
.map(String::toUpperCase) 
.delayElements(Duration.ofMillis(1)); 
            }); 
      return flatMaps; 
} 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.168.214