Overview

チャンクモデルジョブの作成方法について説明する。 チャンクモデルのアーキテクチャについては、Spring Batchのアーキテクチャを参照のこと。

ここでは、チャンクモデルジョブの構成要素について説明する。

構成要素

チャンクモデルジョブの構成要素を以下に示す。 これらの構成要素をBean定義にて組み合わせることで1つのジョブを実現する。

チャンクモデルジョブの構成要素
項番 名称 役割 設定必須 実装必須

1

ItemReader

様々なリソースからデータを取得するためのインターフェース。
Spring Batchにより、フラットファイルや
データベースを対象とした実装が提供されているため、ユーザにて作成する必要はない。

2

ItemProcessor

入力から出力へデータを加工するためのインターフェース。
ユーザは必要に応じてこのインターフェースをimplementsし、ビジネスロジックを実装する。

3

ItemWriter

様々なリソースへデータを出力するためのインターフェース。
ItemReaderと対になるインターフェースと考えてよい。
Spring Batchにより、フラットファイルや
データベースのための実装が提供されているため、ユーザにて作成する必要はない。

この表のポイントは以下である。

  • 入力リソースから出力リソースへ単純にデータを移し替えるだけであれば、設定のみで実現できる。

  • ItemProcessorは、必要が生じた際にのみ実装すればよい。

以降、これらの構成要素を用いたジョブの実装方法について説明する。

How to use

ここでは、実際にチャンクモデルジョブを実装する方法について、以下の順序で説明する。

ジョブの設定

Bean定義ファイルにて、チャンクモデルジョブを構成する要素の組み合わせ方を定義する。 以下に例を示し、構成要素の繋がりを説明する。

Bean定義ファイルの例(チャンクモデル)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
             http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">

    <!-- (1) -->
    <import resource="classpath:META-INF/spring/job-base-context.xml"/>

    <!-- (2) -->
    <context:annotation-config/>

    <!-- (3) -->
    <context:component-scan
        base-package="org.terasoluna.batch.functionaltest.app.common" />

    <!-- (4) -->
    <mybatis:scan
        base-package="org.terasoluna.batch.functionaltest.app.repository.mst"
        factory-ref="jobSqlSessionFactory"/>

    <!-- (5) -->
    <bean id="reader"
          class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
          p:queryId="org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
          p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

    <!-- (6) -->
    <!-- Item Processor -->
    <!-- Item Processor in order that based on the Bean defined by the annotations, not defined here -->

    <!-- (7) -->
    <bean id="writer"
          class="org.springframework.batch.item.file.FlatFileItemWriter"
          scope="step"
          p:resource="file:#{jobParameters[outputFile]}">
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
                          p:names="customerId,customerName,customerAddress,customerTel,chargeBranchId"/>
                </property>
            </bean>
        </property>
    </bean>

    <!-- (8) -->
    <batch:job id="jobCustomerList01" job-repository="jobRepository"> <!-- (9) -->
        <batch:step id="jobCustomerList01.step01"> <!-- (10) -->
            <batch:tasklet transaction-manager="jobTransactionManager"> <!-- (11) -->
                <batch:chunk reader="reader"
                             processor="processor"
                             writer="writer"
                             commit-interval="10" /> <!-- (12) -->
            </batch:tasklet>
        </batch:step>
    </batch:job>
</beans>
ItemProcessor実装クラスの設定
@Component("processor") // (6)
public class CustomerProcessor implement ItemProcessor<Customer, Customer> {
  // omitted
}
項番 説明

(1)

TERASOLUNA Batch 5.xを利用する際に、常に必要なBean定義を読み込む設定をインポートする。

(2)

アノテーションによるBean定義の有効化を行う。ItemProcessorやListenerなどを実装する際に、(3)と合わせて利用する。

(3)

コンポーネントスキャン対象のベースパッケージを設定する。アノテーションによるBean定義を行う場合は、(2)と合わせて利用する。

(5)

ItemReaderの設定。
ItemReaderの詳細は、 データベースアクセスファイルアクセス を参照のこと。

(6)

ItemProcessorは、(2),(3)によりアノテーションにて定義することができ、Bean定義ファイルで定義する必要がない。

(7)

ItemWriterの設定。
ItemWriterの詳細は、 データベースアクセスファイルアクセス を参照のこと。

(8)

ジョブの設定。
id属性に設定する値は、1つのバッチアプリケーションに含まれる全ジョブの範囲において、一意とすること。

(9)

JobRepositoryの設定。
job-repository属性に設定する値は、特別な理由がない限りjobRepository固定とすること。
これにより、すべてのジョブが1つのJobRepositoryで管理できる。 jobRepositoryのBean定義は、(1)により解決する。

(10)

ステップの設定。
id属性に設定する値は、1つのジョブ内で一意とすること。
接頭辞として(8)で設定したid属性を付加した形式にし、 ステップもジョブと同様にバッチアプリケーションに含まれる全ジョブの範囲において一意とすると、 ログ出力や異常発生時の特定をはじめ、様々な場面で有効活用できる。
よって、特別な理由がない限り<ジョブのid>.<step名>とすること。

(11)

タスクレットの設定。
transaction-manager属性に設定する値は、特別な理由がない限りjobTransactionManager固定とすること。
これにより、(12)のcommit-intervalごとにトランザクションが管理される。 詳細については、トランザクション制御を参照のこと。
jobTransactionManagerのBean定義は、(1)により解決する。

(12)

チャンクモデルジョブの設定。
readerprocessorwriterの各属性に対し、 前段までで定義したItemReaderItemProcessorItemWriterのBeanIDを指定する。
commit-interval属性に1チャンクあたりの入力データ件数を設定する。

commit-intervalのチューニング

commit-intervalはチャンクモデルジョブにおける、性能上のチューニングポイントである。

前述の例では10件としているが、利用できるマシンリソースやジョブの特性によって適切な件数は異なる。 複数のリソースにアクセスしてデータを加工するジョブであれば10件から100件程度で処理スループットが頭打ちになることもある。 一方、入出力リソースが1:1対応しておりデータを移し替える程度のジョブであれば5000件や10000件でも処理スループットが伸びることがある。

ジョブ実装時のcommit-intervalは100件程度で仮置きしておき、 その後に実施した性能測定の結果に応じてジョブごとにチューニングするとよい。

コンポーネントの実装

ここでは主に、ItemProcessorを実装する方法について説明する。

他のコンポーネントについては、以下を参照のこと。

ItemProcessorの実装

ItemProcessorの実装方法を説明する。

ItemProcessorは、以下のインターフェースが示すとおり、入力リソースから取得したデータ 1件 を元に、 出力リソースに向けたデータ 1件 を作成する役目を担う。 つまり、ItemProcessorはデータ 1件 に対するビジネスロジックを実装する箇所、と言える。

ItemProcessorインターフェース
public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

なお、インターフェースが示すIOは以下のとおり同じ型でも異なる型でもよい。 同じ型であれば入力データを一部修正することを意味し、 異なる型であれば入力データを元に出力データを生成することを意味する。

ItemProcessor実装例(入出力が同じ型)
@Component
public class AmountUpdateItemProcessor implements
        ItemProcessor<SalesPlanDetail, SalesPlanDetail> {

    @Override
    public SalesPlanDetail process(SalesPlanDetail item) throws Exception {
        item.setAmount(new BigDecimal("1000"));
        return item;
    }
}
ItemProcessor実装例(入出力が異なる型)
@Component
public class UpdateItemFromDBProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    @Inject
    CustomerRepository customerRepository;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
        Customer customer = customerRepository.findOne(readItem.getCustomerId());

        SalesPlanDetail writeItem = new SalesPlanDetail();
        writeItem.setBranchId(customer.getChargeBranchId());
        writeItem.setYear(readItem.getYear());
        writeItem.setMonth(readItem.getMonth());
        writeItem.setCustomerId(readItem.getCustomerId());
        writeItem.setAmount(readItem.getAmount());
        return writeItem;
    }
}
ItemProcessorからnullを返却することの意味

ItemProcessorからnullを返却することは、当該データを後続処理(Writer)に渡さないことを意味し、 言い換えるとデータをフィルタすることになる。 これは、入力データの妥当性検証を実施する上で有効活用できる。 詳細については、入力チェックを参照のこと。

ItemProcessorの処理スループットをあげるには

前述した実装例のように、ItemProcessorの実装クラスではDBやファイルを始めとしたリソースにアクセスしなければならないことがある。 ItemProcessorは入力データ1件ごとに実行されるため、I/Oが少しでも発生するとジョブ全体では大量のI/Oが発生することになる。 そのため、極力I/Oを抑えることが処理スループットをあげる上で重要となる。

1つの方法として、後述のListenerを活用することで事前に必要なデータをメモリ上に確保しておき、 ItemProcessorにおける処理の大半を、CPU/メモリ間で完結するように実装する手段がある。 ただし、1ジョブあたりのメモリを大量に消費することにも繋がるので、何でもメモリ上に確保すればよいわけではない。 I/O回数やデータサイズを元に、メモリに格納するデータを検討すること。

この点については、データの入出力でも紹介する。

複数のItemProcessorを同時に利用する

汎用的なItemProcessorを用意し、個々のジョブに適用したい場合は、 Spring Batchが提供するCompositeItemProcessorを利用し連結することで実現できる。

CompositeItemProcessorによる複数ItemProcessorの連結
<bean id="processor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <ref bean="commonItemProcessor"/>
            <ref bean="businessLogicItemProcessor"/>
        </list>
    </property>
</bean>

delegates属性に指定した順番に処理されることに留意すること。