How to do it...

Let's now implement a blocking batch transaction that reads data from a database:

  1. Create a separate Spring Boot 2.0 application, ch11-batch-db, and add the same starter POM dependencies used in the previous recipe, emphasizing the spring-boot-starter-batch and spring-oxm.
  2. Create a bootstrap class inside its core package, org.packt.process.core, that enables batch processing:
@SpringBootApplication 
@EnableBatchProcessing 
public class BatchProcessBootApplication  { 
   // refer to sources 
} 
  1. Open the MySQL Workbench and create the following reg_employee database schema with the source table, employee, and destination table as permanent:
  1. Now, create application.properties in srcmain esources with an emphasis on reg_employee as the database source:
server.port=9006 
server.servlet.context-path=/ch11-batch 
 
spring.datasource.driverClassName=com.mysql.jdbc.Driver 
spring.datasource.url=jdbc:mysql://localhost:3306/reg_employee?autoReconnect=true&useSSL=true&serverSslCert=classpath:config/spring5packt.crt 
spring.datasource.username=root 
spring.datasource.password=spring5mysql 
spring.datasource.hikari.connection-timeout=60000 
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect 
 
#spring.batch.job.enabled=false 
 
management.port=9006 
management.address=localhost 
management.context-path=/appdetails 
// refer to sources 
  1. Copy logback.xml from the previous project and drop it inside srcmain esources to enable logging.
  1. Since this recipe requires some data from the previous employee records, copy the Employee entity model from the previous chapter and place it inside org.packt.process.core.model.data. Since the input item is an Employee record, do not apply @XmlRootElement to the model class.
  2. Add a custom model, Permanent, that will represent the output item of the process. This class must have @XmlRootElement since this will be used as a JAXB entity for the XML marshalling.
  3. Inside org.packt.process.core.processor, create the following RecordProcessor, which accepts all items from ItemReader<T> and filters only those whose age is greater than 18 for items to be recommended for writing:
public class RecordProcessor  
implements ItemProcessor<Employee, Permanent> { 
 
    private static final Logger log = LoggerFactory.getLogger( 
RecordProcessor.class); 
 
    @Override 
    public Permanent process(Employee item) throws Exception { 
       if (item.getAge() >= 18) { 
          Permanent perm = new Permanent(); 
          perm.setId(item.getId()); 
          perm.setDeptid(item.getDeptid()); 
          perm.setName(item.getFirstname() + " " +  
               item.getLastname()); 
          log.info("empId " + perm.getId() + " passed." ); 
           return perm; 
         } 
         return null; 
    } 
} 
  1. Now let's start building the job @Configuration class by injecting DataSource and instantiating JdbcTemplate to be used by the reader:
@Configuration 
@EnableWebFlux 
public class BatchConfig { 
   private DataSource dataSource; 
   private JdbcTemplate jdbcTemplate; 
    
   public BatchConfig(DataSource dataSource) { 
      this.dataSource = dataSource; 
      jdbcTemplate = new JdbcTemplate(dataSource); 
   } 
}
  1. Now inject ItemReader<T> to BatchConfig, which will query records from the employee table of the reg_employee database:
@Bean 
    public ItemReader<Employee> reader(DataSource dataSource) { 
        JdbcCursorItemReader<Employee> reader =  
new JdbcCursorItemReader<Employee>(); 
        reader.setSql("select * from employee"); 
        reader.setDataSource(dataSource); 
        reader.setRowMapper( 
            (ResultSet resultSet, int rowNum) -> { 
               log.info("Retrieving item resultset: {}",  
                     resultSet); 
               if (!(resultSet.isAfterLast()) &&  
                  !(resultSet.isBeforeFirst())) { 
                   Employee emp = new Employee(); 
                   emp.setId(resultSet.getInt("id")); 
emp.setEmpid(resultSet.getInt("empId")); 
                emp.setDeptid(resultSet.getInt("deptid")); 
                   emp.setFirstname( 
resultSet.getString("firstname")); 
                        // refer to sources 
                        return emp; 
                    } else { 
                        log.info("Returning null item"); 
                        return null; 
                    } 
                }); 
        return reader; 
} 
  1. In this recipe, we will have two writers, namely the writer that will write to the permanent table and the writer that will generate emp.xml. Add these two writers to the following BatchConfig class:
@Bean("writer1") 
public ItemWriter<Permanent> writer() { 
     JdbcBatchItemWriter<Permanent> writer =  
         new JdbcBatchItemWriter<>(); 
     writer.setItemPreparedStatementSetter(setter()); 
     writer.setItemSqlParameterSourceProvider( 
new BeanPropertyItemSqlParameterSourceProvider 
<Permanent>()); 
     writer.setDataSource(dataSource); 
     writer.setSql("insert into permanent  
(id, name, deptid) values (?,?,?)"); 
     return writer; 
} 
     
@Bean 
public ItemPreparedStatementSetter<Permanent> setter() { 
      return (item, ps) -> { 
         ps.setInt(1, item.getId()); 
         ps.setString(2, item.getName()); 
         ps.setInt(3, item.getDeptid()); 
     }; 
 } 
 
@Bean("writer2") 
public ItemWriter<Permanent> xmlWriter() { 
      StaxEventItemWriter<Permanent> xmlFileWriter =  
new StaxEventItemWriter<>(); 
      String exportFilePath =  
         "./src/main/resources/emps.xml"; 
      xmlFileWriter.setResource(new  
         FileSystemResource(exportFilePath)); 
      xmlFileWriter.setRootTagName("employees"); 
     
      Jaxb2Marshaller empMarshaller =  
new Jaxb2Marshaller(); 
      empMarshaller.setClassesToBeBound(Permanent.class); 
      xmlFileWriter.setMarshaller(empMarshaller); 
      return xmlFileWriter; 
} 
  1. Do not forget to inject the custom RecordProcessor into the webflux container:
@Bean 
public ItemProcessor<Employee, Permanent> processor() { 
      return new RecordProcessor(); 
} 
  1. Lastly, add the steps and job declaration to complete the configuration details:
@Bean 
public Job importUserJob(JobBuilderFactory jobs,  
Step step1, Step step2, JobExecutionListener  
   listener) { 
   return jobs.get("importUserJob") 
         .incrementer(new RunIdIncrementer()) 
         .listener(listener) 
         .flow(step1) 
         .next(step2) 
         .end() 
         .build(); 
} 
 
     @Bean("step1") 
     public Step step1(StepBuilderFactory  
            stepBuilderFactory,  
         ItemReader<Employee> reader, 
          ItemProcessor<Employee, Permanent> processor) { 
           return stepBuilderFactory.get("step1") 
              .<Employee, Permanent>chunk(5) 
              .reader(reader) 
              .processor(processor) 
              .writer(writer()) 
              .build(); 
     } 
        
     @Bean("step2") 
     public Step step2(StepBuilderFactory  
            stepBuilderFactory,  
         ItemReader<Employee> reader, 
          ItemProcessor<Employee, Permanent> processor) { 
           return stepBuilderFactory.get("step2") 
             .<Employee, Permanent>chunk(2) 
             .reader(reader) 
             .processor(processor) 
             .writer(xmlWriter()) 
             .build(); 
     } 
} 
  1. Before we end this recipe, add a listener, JobExecutionListenerSupport, inside a new package, org.packt.process.core.listener, for verification after a successful batch process execution:
@Component 
public class OnCompleteJobExecListener  
extends JobExecutionListenerSupport { 
 
    private static final Logger log =  
      LoggerFactory.getLogger( 
OnCompleteJobExecListener.class); 
 
    private DataSource dataSource; 
    private JdbcTemplate jdbcTemplate; 
    
    public OnCompleteJobExecListener(DataSource dataSource) { 
      this.dataSource = dataSource; 
      jdbcTemplate = new JdbcTemplate(dataSource); 
    } 
 
    @Override 
    public void afterJob(JobExecution jobExecution) { 
        if (jobExecution.getStatus() ==  
            BatchStatus.COMPLETED) { 
            log.info("Short-lived Job Done..."); 
 
            List<Permanent> results = jdbcTemplate 
.query("select * from permanent", (rs, row) -> { 
            Permanent permanent = new Permanent(); 
            permanent.setId(rs.getInt("id")); 
            permanent.setDeptid(rs.getInt("deptid")); 
            permanent.setName(rs.getString("name")); 
            return permanent; 
         }); 
 
         for (Permanent permanent : results) { 
                log.info("Data is: " + permanent +  
" in the database."); 
            } 
        } 
    } 
} 
  1. Save all files. Run the clean spring-boot:run -U command, check the emp.xml file in srcmain esources, and check all the output items in permanent table.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.133.160