Overview

How to create chunk model job is explained. Refer to Spring Batch architecture for the architecture of chunk model.

The components of chunk model job is explained here.

Components

The components of chunk model job are shown below. Implement 1 job by combining these components in job Bean definition file.

Components of chunk model job
Sr. No. Name Role Mandatory settings Mandatory implementation

1

ItemReader

Interface to fetch data from various resources.
Since implementation for flat files and database is provided by Spring Batch,
there is no need for the user to create it.

2

ItemProcessor

Interface for processing data from input to output.
The user implements this interface whenever required and implements business logic.

3

ItemWriter

Interface for the output of data to various resources.
An interface paired with ItemReader.
Since implementation for flat files and database is provided by Spring Batch,
there is no need for the user to create it.

The points in this table are as follows.

  • If the data is to be only transferred from input resource to output resource in a simple way, it can be implemented only by setting.

  • ItemProcessor should be implemented whenever required.

Hereafter, how to implement the job using these components, is explained.

How to use

How to implement chunk model job is explained in the following order here.

Job configuration

Define a way to combine the elements that constitutes chunk model job in the Bean definition file. An example is shown below and the relation between components is explained.

Example of Bean definition file (Chunk model)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
             http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">

    <!-- (1) -->
    <import resource="classpath:META-INF/spring/job-base-context.xml"/>

    <!-- (2) -->
    <context:annotation-config/>

    <!-- (3) -->
    <context:component-scan
        base-package="org.terasoluna.batch.functionaltest.app.common" />

    <!-- (4) -->
    <mybatis:scan
        base-package="org.terasoluna.batch.functionaltest.app.repository.mst"
        factory-ref="jobSqlSessionFactory"/>

    <!-- (5) -->
    <bean id="reader"
          class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
          p:queryId="org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
          p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

    <!-- (6) -->
    <!-- Item Processor -->
    <!-- Item Processor in order that based on the Bean defined by the annotations, not defined here -->

    <!-- (7) -->
    <bean id="writer"
          class="org.springframework.batch.item.file.FlatFileItemWriter"
          scope="step"
          p:resource="file:#{jobParameters[outputFile]}">
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
                          p:names="customerId,customerName,customerAddress,customerTel,chargeBranchId"/>
                </property>
            </bean>
        </property>
    </bean>

    <!-- (8) -->
    <batch:job id="jobCustomerList01" job-repository="jobRepository"> <!-- (9) -->
        <batch:step id="jobCustomerList01.step01"> <!-- (10) -->
            <batch:tasklet transaction-manager="jobTransactionManager"> <!-- (11) -->
                <batch:chunk reader="reader"
                             processor="processor"
                             writer="writer"
                             commit-interval="10" /> <!-- (12) -->
            </batch:tasklet>
        </batch:step>
    </batch:job>
</beans>
Configuration of ItemProcessor implementation class
@Component("processor") // (6)
public class CustomerProcessor implement ItemProcessor<Customer, Customer> {
  // omitted
}
Sr. No. Explanation

(1)

Import the settings to always read the required Bean definition when using TERASOLUNA Batch 5.x.

(2)

Enable Bean definition using annotation. Use it with (3) when implementing ItemProcessor, Listener etc.

(3)

Set base package of component scan target. When defining Bean using annotation, use it with (2).

(5)

ItemReader configuration.
For the details of ItemReader, refer to Database access and File access.

(6)

ItemProcessor can be defined by annotation in (2), (3), so there is no need to define in the Bean definition file.

(7)

ItemWriter configuration.
For the details of ItemWriter, refer to Database access and File access.

(8)

Job configuration.
The value set in the id attribute should be unique in the range of all jobs included in one batch application.

(9)

JobRepository configuration.
The value set in the job-repository attribute should be fixed to jobRepository unless there is a special reason.
This will allow all the jobs to be managed by 1 JobRepository. Resolve Bean definition of jobRepository by (1).

(10)

Step configuration.
The value set in the id attribute should be unique in 1 job.
By adding id attribute set in (8) as a prefix, if the step is made unique in the range of all jobs included in the batch application same as job, it can be effectively used in various situations such as for log output, identification of error occurrence.
Therefore, it should be <Job id>.<step name>.

(11)

Tasklet configuration.
The value set in the transaction-manager attribute should be fixed to jobTransactionManager unless there is a special reason.
This will allow the transaction to be managed for each commit-interval of (12). For details, refer to Transaction control.
Resolve Bean definition of jobTransactionManager by (1).

(12)

Chunk model job configuration.
Specify Ben ID for ItemReaderItemProcessorItemWriter defined in the previous steps for each attribute of reader, processor and writer.
Set input data count per chunk in commit-interval attribute.

Tuning of commit-interval

commit-interval is the performance tuning point in chunk model job.

In the above example, 10 records are used however, exact count differs with the characteristics of available machine resource and job. In case of a job that processes data by accessing multiple resources, the process throughput may reach to 100 records from 10 records. If input/output resource is of 1:1 correspondence and there is a job of transferring data, then the process throughput may increase to 5000 records or even to 10000 records.

Temporarily set commit-interval` to 100 records at the time of implementing the job, and then perform tuning of each job as per the result of performance measurement.

Implementation of components

How to implement mainly ItemProcessor is explained here.

Refer to the following for other components.

Implementation of ItemProcessor

How to implement ItemProcessor is explained.

ItemProcessor is responsible for creating 1 record data for the output resource based on the 1 record data fetched from the input resource as shown in the interface below. In other words, ItemProcessor is where business logic for 1 record data is implemented.

ItemProcessor interface
public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

The interface indicating I and O can be of same type or of different type as shown below. Same type means modifying input data partially. Different type means to generate output data based on the input data.

Example of implementation of ItemProcessor(Input/Output is of same type)
@Component
public class AmountUpdateItemProcessor implements
        ItemProcessor<SalesPlanDetail, SalesPlanDetail> {

    @Override
    public SalesPlanDetail process(SalesPlanDetail item) throws Exception {
        item.setAmount(new BigDecimal("1000"));
        return item;
    }
}
Example of implementation of ItemProcessor(Input/Output is of different type)
@Component
public class UpdateItemFromDBProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    @Inject
    CustomerRepository customerRepository;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
        Customer customer = customerRepository.findOne(readItem.getCustomerId());

        SalesPlanDetail writeItem = new SalesPlanDetail();
        writeItem.setBranchId(customer.getChargeBranchId());
        writeItem.setYear(readItem.getYear());
        writeItem.setMonth(readItem.getMonth());
        writeItem.setCustomerId(readItem.getCustomerId());
        writeItem.setAmount(readItem.getAmount());
        return writeItem;
    }
}
Explanation of return of null from ItemProcessor

Return of null from ItemProcessor means the data is not passed to the subsequent process (Writer). In other words, the data is filtered. This can be effectively used to validate the input data. For detail, refer to Input check.

To increase process throughput of ItemProcessor

As shown in the previous implementation example, the implementation class of ItemProcessor should access resources such as DB and files. Since ItemProcessor is executed for each record of input data, even if there is small I/O, large I/O occurs in the entire job, so it is important to suppress I/O as much as possible for increasing process throughput.

One method is to store the required data in memory in advance by utilizing Listener to be mentioned later and implement most of the processing in ItemProcessor so that it completes between CPU/ memory. However, since it consumes a large amount of memory per job, its not that anything can be stored in the memory. The data to be stored in memory based on I/O frequency and data size should be studied.

This point is introduced even in Input/Output of data.

Use multiple ItemProcessors at the same time

If a general ItemProcessor is provided to apply to each job, it can be implemented by using CompositeItemProcessor provided by Spring Batch and linking it.

Linking of multiple ItemProcessor by CompositeItemProcessor
<bean id="processor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <ref bean="commonItemProcessor"/>
            <ref bean="businessLogicItemProcessor"/>
        </list>
    </property>
</bean>

Note that it is processed in the order specified in the delegates attribute.