Overview
How to create chunk model job is explained. Refer to Spring Batch architecture for the architecture of chunk model.
The components of chunk model job is explained here.
Components
The components of chunk model job are shown below. Implement 1 job by combining these components in job Bean definition file.
Sr. No. | Name | Role | Mandatory settings | Mandatory implementation |
---|---|---|---|---|
1 |
ItemReader |
Interface to fetch data from various resources. |
||
2 |
ItemProcessor |
Interface for processing data from input to output. |
||
3 |
ItemWriter |
Interface for the output of data to various resources. |
The points in this table are as follows.
-
If the data is to be only transferred from input resource to output resource in a simple way, it can be implemented only by setting.
-
ItemProcessor
should be implemented whenever required.
Hereafter, how to implement the job using these components, is explained.
How to use
How to implement chunk model job is explained in the following order here.
Job configuration
Define a way to combine the elements that constitutes chunk model job in the Bean definition file. An example is shown below and the relation between components is explained.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">
<!-- (1) -->
<import resource="classpath:META-INF/spring/job-base-context.xml"/>
<!-- (2) -->
<context:annotation-config/>
<!-- (3) -->
<context:component-scan
base-package="org.terasoluna.batch.functionaltest.app.common" />
<!-- (4) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.app.repository.mst"
factory-ref="jobSqlSessionFactory"/>
<!-- (5) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<!-- (6) -->
<!-- Item Processor -->
<!-- Item Processor in order that based on the Bean defined by the annotations, not defined here -->
<!-- (7) -->
<bean id="writer"
class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step"
p:resource="file:#{jobParameters[outputFile]}">
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
p:names="customerId,customerName,customerAddress,customerTel,chargeBranchId"/>
</property>
</bean>
</property>
</bean>
<!-- (8) -->
<batch:job id="jobCustomerList01" job-repository="jobRepository"> <!-- (9) -->
<batch:step id="jobCustomerList01.step01"> <!-- (10) -->
<batch:tasklet transaction-manager="jobTransactionManager"> <!-- (11) -->
<batch:chunk reader="reader"
processor="processor"
writer="writer"
commit-interval="10" /> <!-- (12) -->
</batch:tasklet>
</batch:step>
</batch:job>
</beans>
@Component("processor") // (6)
public class CustomerProcessor implement ItemProcessor<Customer, Customer> {
// omitted
}
Sr. No. | Explanation |
---|---|
(1) |
Import the settings to always read the required Bean definition when using TERASOLUNA Batch 5.x. |
(2) |
Enable Bean definition using annotation. Use it with (3) when implementing ItemProcessor, Listener etc. |
(3) |
Set base package of component scan target. When defining Bean using annotation, use it with (2). |
(5) |
ItemReader configuration. |
(6) |
ItemProcessor can be defined by annotation in (2), (3), so there is no need to define in the Bean definition file. |
(7) |
ItemWriter configuration. |
(8) |
Job configuration. |
(9) |
|
(10) |
Step configuration. |
(11) |
Tasklet configuration. |
(12) |
Chunk model job configuration. |
Tuning of commit-interval
In the above example, 10 records are used however, exact count differs with the characteristics of available machine resource and job. In case of a job that processes data by accessing multiple resources, the process throughput may reach to 100 records from 10 records. If input/output resource is of 1:1 correspondence and there is a job of transferring data, then the process throughput may increase to 5000 records or even to 10000 records. Temporarily set |
Implementation of components
How to implement mainly ItemProcessor is explained here.
Refer to the following for other components.
-
ItemReader、ItemWriter
-
Listener
Implementation of ItemProcessor
How to implement ItemProcessor is explained.
ItemProcessor is responsible for creating 1 record data for the output resource based on the 1 record data fetched from the input resource as shown in the interface below. In other words, ItemProcessor is where business logic for 1 record data is implemented.
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
The interface indicating I
and O
can be of same type or of different type as shown below.
Same type means modifying input data partially.
Different type means to generate output data based on the input data.
@Component
public class AmountUpdateItemProcessor implements
ItemProcessor<SalesPlanDetail, SalesPlanDetail> {
@Override
public SalesPlanDetail process(SalesPlanDetail item) throws Exception {
item.setAmount(new BigDecimal("1000"));
return item;
}
}
@Component
public class UpdateItemFromDBProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {
@Inject
CustomerRepository customerRepository;
@Override
public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
Customer customer = customerRepository.findOne(readItem.getCustomerId());
SalesPlanDetail writeItem = new SalesPlanDetail();
writeItem.setBranchId(customer.getChargeBranchId());
writeItem.setYear(readItem.getYear());
writeItem.setMonth(readItem.getMonth());
writeItem.setCustomerId(readItem.getCustomerId());
writeItem.setAmount(readItem.getAmount());
return writeItem;
}
}
Explanation of return of null from ItemProcessor
Return of null from ItemProcessor means the data is not passed to the subsequent process (Writer). In other words, the data is filtered. This can be effectively used to validate the input data. For detail, refer to Input check. |
To increase process throughput of ItemProcessor
As shown in the previous implementation example, the implementation class of ItemProcessor should access resources such as DB and files. Since ItemProcessor is executed for each record of input data, even if there is small I/O, large I/O occurs in the entire job, so it is important to suppress I/O as much as possible for increasing process throughput. One method is to store the required data in memory in advance by utilizing Listener to be mentioned later and implement most of the processing in ItemProcessor so that it completes between CPU/ memory. However, since it consumes a large amount of memory per job, its not that anything can be stored in the memory. The data to be stored in memory based on I/O frequency and data size should be studied. This point is introduced even in Input/Output of data. |
Use multiple ItemProcessors at the same time
If a general ItemProcessor is provided to apply to each job,
it can be implemented by using Linking of multiple ItemProcessor by CompositeItemProcessor
Note that it is processed in the order specified in the delegates attribute. |