Let's create a standalone application that transforms XML to a text file using the following steps:
- Using Eclipse STS, create a Maven project, ch11-batch-sync, that contains the Spring Boot 2.0.0.M2 starter POM dependencies, such as actuator and JDBC, with some support plugins such as the MySQL connector.
- Add the starter POM dependency for the latest Spring Batch 4.0:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency>
- Since XML parsing is involved, add the Spring OXM module with its XSTREAM dependency in pom.xml:
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.4.9</version> </dependency>
- Create a core package, org.packt.process.core, and drop a bootstrap class inside that enables batch processing and task scheduling:
@EnableScheduling @EnableBatchProcessing @SpringBootApplication public class SyncBatchBootApplication { // refer to sources }
- Copy logback.xml from the previous project and drop it inside srcmain esources to enable logging.
- Now, inside srcmainproperties, create the application properties with all the server, actuator, and HikariCP datasource autoconfiguration details. Use the newly created batchproc database for the updated spring.datasource.url property. This database will be populated with configuration tables by Spring Batch once the application starts:
server.port=9007 server.servlet.context-path=/ch11-batch-sync spring.datasource.driverClassName=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/batchproc?autoReconnect=true&useSSL=true&serverSslCert=classpath:config/spring5packt.crt spring.datasource.username=root spring.datasource.password=spring5mysql spring.datasource.hikari.connection-timeout=60000 spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect management.port=9007 management.address=localhost management.context-path=/appdetails endpoints.info.enabled=true endpoints.info.sensitive=false endpoints.info.id=info info.app.description=Department Microservice info.app.version=1.0.0 endpoints.sensitive=false endpoints.shutdown.sensitive=false endpoints.shutdown.enabled=true
- For the data models, we will utilize the hrs data from the previous chapter, so copy the Department entity model to the org.packt.process.core.model.data package.
- Since Java Architecture for the XML Binding (JAXB) parsing technique will be used in this recipe, apply @XmlRootElement and @XmlElement to all the domain models to make them JAXB classes:
@XmlRootElement(name = "department") public class Department implements Serializable{ private Integer id; private Integer deptid; private String name; @XmlElement public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } @XmlElement public Integer getDeptid() { return deptid; } public void setDeptid(Integer deptid) { this.deptid = deptid; } @XmlElement public String getName() { return name; } public void setName(String name) { this.name = name; } }
- Create an additional JAXB class that will contain all the Department elements or records, and place this inside the model package:
@XmlRootElement(name="departments") public class Departments implements Serializable{ private List<Department> department; public List<Department> getDepartment() { return department; } public void setDepartment(List<Department> department) { this.department = department; } }
Do not apply @XmlElement to the instance variable since the Department class is already a JAXB entity, record, or element. Otherwise, parsing errors will be encountered.
- Let's now start building the Spring Batch components needed to transform our XML data to a text file. First, it will be easier to start the configuration with the reader and writer components. Create a package, org.packt.process.core.reader, that contains a custom org.springframework.batch.item.ItemReader<T> implementation whose read() method is executed multiple times to feed the source data into the engine. This method returns null once all the data within a given period has been transported:
public class DepartmentItemReader implements ItemReader<Department> { private final String filename; private ItemReader<Department> delegate; public DepartmentItemReader(final String filename) { this.filename = filename; } @Override public Department read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(depts()); } return delegate.read(); } private List<Department> depts() throws FileNotFoundException, JAXBException { JAXBContext context = JAXBContext.newInstance( Departments.class, Department.class); Unmarshaller unmarshaller = context.createUnmarshaller(); Departments deptList = (Departments) unmarshaller .unmarshal(new FileInputStream(filename)); return deptList.getDepartment(); } }
The implementation used JAXB marshaling to read all the data from the source file. The extracted data will become the items of the batch process.
- Create another package, org.packt.process.core.writers, and drop an org.springframework.batch.item.ItemWriter<T> implementation in it, which has a write() method that is responsible for flushing all items into another file channel. The following is ItemWriter<T> that writes all items to a text file but discards writes during rollback:
public class DepartmentItemWriter implements ItemWriter<Department>, Closeable { private PrintWriter writer; public DepartmentItemWriter() { OutputStream out = null; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } finally{ this.writer = new PrintWriter(out); } } @Override public void write(List<? extends Department> items) throws Exception { for (Department item : items) { writer.println(item.getName() + " " + item.getDeptid() ); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }
The text file should only contain the department ID and the name of the item.
- The data transformation or conversion happens only when org.springframework.batch.item.ItemProcessor<I,O> interferes in the process by accepting read data from ItemReader<T> through its process() method. ItemProcessor provides the business logic, and a set of rules and constraints for data conversion, and returns an output item to be accessed and collected by ItemWriter<T>. The method returns null if the input object does not deserve to join the others for writing. Create a new package, org.packt.process.core.processor, that contains an ItemProcess<I,O> class that processes an input Department object with a name length greater than or equal to 5:
public class DeptNameProcessor implements ItemProcessor<Department, Department> { @Override public Department process(final Department item) throws Exception { if (item.getName().length() >= 5) { return item; } return null; } }
- To impose validation rules, another processor called org.springframework.batch.item.validator.ValidatingItemProcessor provides additional tasks to filter out unnecessary or unimportant items based on the business rules of the requirement. The following class omits a Department input object that has a department ID lower than 400:
public class DeptIDValidProcesor extends ValidatingItemProcessor<Department> { public DeptIDValidProcesor() { super( item -> { if (item.getDeptid() < 400) { throw new ValidationException( "Customer ID lower than 400..."); } } ); setFilter(true); } }
- At this point, we are now ready to create the @Configuration job that requires the DepartmentItemReader, DepartmentItemWriter, DeptNameProcessor, and DeptIDValidProcesor bean objects. The following job configuration class implements single-item batch processing. The batch process uses an org.springframework.batch.core.step.tasklet.Tasklet interface whose execute() method is repeatedly run until all the source data is consumed. Each execution is wrapped in an org.springframework.batch.core.Step class that contains all the information on its attempt to run read-write items. All these step executions will not work without the injected JobBuilderFactory and StepBuilderFactory:
@Configuration @EnableWebFlux public class BatchConfig { @Autowired private JobBuilderFactory jobCreators; @Autowired private StepBuilderFactory stepCreators; public Job deptBatchJob() { return jobCreators.get("deptReportJob") .start(taskletStep()) .build(); } @Bean public Step taskletStep() { return stepCreators.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contrib, chunkCtx) -> { return RepeatStatus.FINISHED; }; } }
- Our job configuration also implements a bulk batch process by calling the chunk() method of StepBuilderFactory. This method accepts n number of items, which determines the number of items expected to be rolled out to the reader, the processor, and the writer. The following snippets are added to BatchConfig, which will add bulk batch processing:
@Bean public Step chunkStep() { return stepCreators.get("chunkStep") .<Department, Department>chunk(5) .build(); }
- Update the following method to execute both per item and per chunk batch processing:
public Job deptBatchJob() { return jobCreators.get("deptReportJob") .start(taskletStep()) .next(chunkStep()) .build(); }
- Now inject all the reader, writer, and processor beans to BatchConfig, and ensure that you convert the scopes of these objects from @Singleton to @StepScope:
@StepScope @Bean public ItemReader<Department> reader() { return new DepartmentItemReader("depts.xml"); } @StepScope @Bean public ItemProcessor<Department, Department> processor() { CompositeItemProcessor<Department, Department> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList( new DeptNameProcessor(), new DeptIDValidProcesor())); return processor; } @StepScope @Bean public ItemWriter<Department> writer() { return new DepartmentItemWriter(); }
- Update the chunkStep() method to include the reader, writer, and processor:
@Bean public Step chunkStep() { return stepCreators.get("chunkStep") .<Department, Department>chunk(5) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
- To complete our job configuration class, inject JobLauncher into BatchConfig to execute deptBatchJob() with a TimeStamp job parameter to distinguish one step execution from the other. Create a scheduler to run the JobLauncher job every 5,000 milliseconds:
@Autowired private JobLauncher jobLauncher; @Scheduled(fixedRate = 5000) public void startJob() throws Exception { JobExecution execution = jobLauncher.run( deptBatchJob(), new JobParametersBuilder().addLong( "procId", System.nanoTime()).toJobParameters()); }
- Create a sample depts.xml file and just drop it inside the root project folder:
<departments> <department> <id>111</id> <deptid>5656</deptid> <name>Human Resources Department</name> </department> <department> <id>1234</id> <deptid>6777</deptid> <name>Sports and Wellness Department]</name> </department> <department> <id>1456</id> <deptid>345</deptid> <name>Kiosk</name> </department> <department> <id>1459</id> <deptid>23232</deptid> <name>Engineering Department</name> </department> ... ... ... </departments>
- Save all files. Run Maven clean spring-boot:run -U and check the output.txt file in the root project folder:
If you encounter any errors related to existing duplicate running job during the launch, disable the spring.batch.job.enabled property in application.properties and retry running the Maven commands given earlier.
- Open the MySQL Workbench and check the batchproc database after launch: