TERASOLUNA Batch Framework for Java (5.x) Development Guideline - version 5.0.1.RELEASE, 2017-9-27
> INDEX

Overview

TERASOLUNA Batch 5.xでは、データベースアクセスの方法として、MyBatis3(以降、「MyBatis」と呼ぶ)を利用する。 MyBatisによるデータベースアクセスの基本的な利用方法は、TERASOLUNA Server 5.x 開発ガイドラインの以下を参照してほしい。

本節では、TERASOLUNA Batch 5.x特有の使い方を中心に説明する。

Linux環境でのOracle JDBC利用時の留意事項について

Linux環境でのOracle JDBCを利用時は、Oracle JDBCが使用するOSの乱数生成器によるロックが発生する。 そのため、ジョブを並列実行しようとしても逐次実行になる事象や片方のコネクションがタイムアウトする事象が発生する。
本事象に対する2パターンの回避策を以下に示す。

  • Javaコマンド実行時に、システムプロパティで以下を設定する。

    • -Djava.security.egd=file:///dev/urandom

  • ${JAVA_HOME}/jre/lib/security/java.security内のsecurerandom.source=/dev/randomsecurerandom.source=/dev/urandomに変更する。

How to use

TERASOLUNA Batch 5.xでのデータベースアクセス方法を説明する。

なお、チャンクモデルとタスクレットモデルにおけるデータベースアクセス方法の違いに留意する。

TERASOLUNA Batch 5.xでのデータベースアクセスは、以下の2つの方法がある。
これらはデータベースアクセスするコンポーネントによって使い分ける。

  1. MyBatis用のItemReaderおよびItemWriterを利用する。

    • チャンクモデルでのデータベースアクセスによる入出力で使用する。

      • org.mybatis.spring.batch.MyBatisCursorItemReader

      • org.mybatis.spring.batch.MyBatisBatchItemWriter

  2. Mapperインターフェースを利用する

    • チャンクモデルでのビジネスロジック処理で使用する。

      • ItemProcessor実装で利用する。

    • タスクレットモデルでのデータベースアクセス全般で使用する。

      • Tasklet実装で利用する。

共通設定

データベースアクセスにおいて必要な共通設定について説明を行う。

データソースの設定

TERASOLUNA Batch 5.xでは、2つのデータソースを前提としている。 launch-context.xmlでデフォルト設定している2つのデータソースを示す。

データソース一覧
データソース名 説明

adminDataSource

Spring BatchやTERASOLUNA Batch 5.xが利用するデータソース
JobRepositoryや非同期実行(DBポーリング)で利用している。

jobDataSource

ジョブが利用するデータソース

以下に、launch-context.xmlと接続情報のプロパティを示す。
これらをユーザの環境に合わせて設定すること。

resources\META-INF\spring\launch-context.xml
<!-- (1) -->
<bean id="adminDataSource" class="org.apache.commons.dbcp2.BasicDataSource"
      destroy-method="close"
      p:driverClassName="${admin.h2.jdbc.driver}"
      p:url="${admin.h2.jdbc.url}"
      p:username="${admin.h2.jdbc.username}"
      p:password="${admin.h2.jdbc.password}"
      p:maxTotal="10"
      p:minIdle="1"
      p:maxWaitMillis="5000"
      p:defaultAutoCommit="false"/>

<!-- (2) -->
<bean id="jobDataSource" class="org.apache.commons.dbcp2.BasicDataSource" 
      destroy-method="close"
      p:driverClassName="${jdbc.driver}"
      p:url="${jdbc.url}"
      p:username="${jdbc.username}"
      p:password="${jdbc.password}"
      p:maxTotal="10"
      p:minIdle="1"
      p:maxWaitMillis="5000"
      p:defaultAutoCommit="false" />
batch-application.properties
# (3)
# Admin DataSource settings.
admin.h2.jdbc.driver=org.h2.Driver
admin.h2.jdbc.url=jdbc:h2:mem:batch;DB_CLOSE_DELAY=-1
admin.h2.jdbc.username=sa
admin.h2.jdbc.password=

# (4)
# Job DataSource settings.
jdbc.driver=org.postgresql.Driver
jdbc.url=jdbc:postgresql://localhost:5432/postgres
jdbc.username=postgres
jdbc.password=postgres
説明
項番 説明

(1)

adminDataSourceの定義。(3)の接続情報が設定される。

(2)

jobDataSourceの定義。(4)の接続情報が設定される。

(3)

adminDataSourceで利用するデータベースへの接続情報
この例では、H2を利用している。

(4)

jobDataSourceで利用するデータベースへの接続情報
この例では、PostgreSQLを利用している。

MyBatisの設定

TERASOLUNA Batch 5.xで、MyBatisの設定をする上で重要な点について説明をする。

バッチ処理を実装する際の重要なポイントの1つとして「大量のデータを一定のリソースで効率よく処理する」が挙げられる。
これに関する設定を説明する。

  • fetchSize

    • 一般的なバッチ処理では、大量のデータを処理する際の通信コストを低減するために、 JDBCドライバに適切なfetchSizeを指定することが必須である。 fetchSizeとは、JDBCドライバとデータベース間とで1回の通信で取得するデータ件数を設定するパラメータである。 この値は出来る限り大きい値を設定することが望ましいが、大きすぎるとメモリを圧迫するため、注意が必要である。 ユーザにてチューニングする必要がある箇所と言える。

    • MyBatisでは、全クエリ共通の設定としてdefaultFetchSizeを設定することができ、さらにクエリごとのfetchSize設定で上書きできる。

  • executorType

    • 一般的なバッチ処理では、同一トランザクション内で同じSQLを全データ件数/fetchSizeの回数分実行することになる。 この際、都度ステートメントを作成するのではなく再利用することで効率よく処理できる。

    • MyBatisの設定における、defaultExecutorTypeREUSEを設定することでステートメントの再利用ができ、 処理スループット向上に寄与する。

    • 大量のデータを一度に更新する場合、JDBCのバッチ更新を利用することで性能向上が期待できる。
      そのため、MyBatisBatchItemWriterで利用するSqlSessionTemplateには、
      executorTypeに(REUSEではなく)BATCHが設定されている。

TERASOLUNA Batch 5.xでは、同時に2つの異なるExecutorTypeが存在する。 一方のExecutorTypeで実装する場合が多いと想定するが、併用時は特に注意が必要である。 この点は、ItemReader・ItemWriter以外のデータベースアクセスにて詳しく説明する。

MyBatisのその他のパラメータ

その他のパラメータに関しては以下リンクを参照し、 アプリケーションの特性にあった設定を行うこと。
http://www.mybatis.org/mybatis-3/configuration.html

以下にデフォルト提供されている設定を示す。

META-INF/spring/launch-context.xml
<bean id="jobSqlSessionFactory"
      class="org.mybatis.spring.SqlSessionFactoryBean"
      p:dataSource-ref="jobDataSource">
    <!-- (1) -->
    <property name="configuration">
        <bean class="org.apache.ibatis.session.Configuration"
              p:localCacheScope="STATEMENT"
              p:lazyLoadingEnabled="true"
              p:aggressiveLazyLoading="false"
              p:defaultFetchSize="1000"
              p:defaultExecutorType="REUSE"/>
    </property>
</bean>

<!-- (2) -->
<bean id="batchModeSqlSessionTemplate"
      class="org.mybatis.spring.SqlSessionTemplate"
      c:sqlSessionFactory-ref="jobSqlSessionFactory"
      c:executorType="BATCH"/>
説明
項番 説明

(1)

MyBatisの各種設定を行う。
デフォルトでは、fetchSizeを1000に設定している。

(2)

MyBatisBatchItemWriterのために、executorTypeBATCHSqlSessionTemplateを定義している。

adminDataSourceを利用したSqlSessionFactoryの定義箇所について

同期実行をする場合は、adminDataSourceを利用したSqlSessionFactoryは不要であるため、定義がされていない。 非同期実行(DBポーリング)を利用する場合、ジョブ要求テーブルへアクセスするために META-INF/spring/async-batch-daemon.xml内に定義されている。

META-INF/spring/async-batch-daemon.xml
<bean id="adminSqlSessionFactory"
      class="org.mybatis.spring.SqlSessionFactoryBean"
      p:dataSource-ref="adminDataSource" >
    <property name="configuration">
        <bean class="org.apache.ibatis.session.Configuration"
              p:localCacheScope="STATEMENT"
              p:lazyLoadingEnabled="true"
              p:aggressiveLazyLoading="false"
              p:defaultFetchSize="1000"
              p:defaultExecutorType="REUSE"/>
    </property>
</bean>

Mapper XMLの定義

TERASOLUNA Batch 5.x特有の説明事項はないので、TERASOLUNA Server 5.x 開発ガイドラインの データベースアクセス処理の実装を参照してほしい。

MyBatis-Springの設定

MyBatis-Springが提供するItemReaderおよびItemWriterを使用する場合、MapperのConfigで使用するMapper XMLを設定する必要がある。

設定方法としては、以下の2つが考えられる。

  1. 共通設定として、すべてのジョブで使用するMapper XMLを登録する。

    • META-INF/spring/launch-context.xmlにすべてのMapper XMLを記述することになる。

  2. 個別設定として、ジョブ単位で利用するMapper XMLを登録する。

    • META-INF/jobs/配下のBean定義に、個々のジョブごとに必要なMapper XMLを記述することになる。

共通設定をしてしまうと、同期実行をする際に実行するジョブのMapper XMLだけでなく、その他のジョブが使用するMapper XMLも読み込んでしまうために以下に示す弊害が生じる。

  • ジョブの起動までに時間がかかる

  • メモリリソースの消費が大きくなる

これを回避するために、TERASOLUNA Batch 5.xでは、個別設定として、個々のジョブ定義でそのジョブが必要とするMapper XMLだけを指定する設定方法を採用する。

基本的な設定方法については、TERASOLUNA Server 5.x 開発ガイドラインの MyBatis-Springの設定を参照してほしい。

TERASOLUNA Batch 5.xでは、複数のSqlSessionFactoryおよびSqlSessionTemplateが定義されているため、 どれを利用するか明示的に指定する必要がある。
基本的にはjobSqlSessionFactoryを指定すればよい。

以下に設定例を示す。

META-INF/jobs/common/jobCustomerList01.xml
<!-- (1) -->
<mybatis:scan
    base-package="org.terasoluna.batch.functionaltest.app.repository.mst"
    factory-ref="jobSqlSessionFactory"/>
説明
項番 説明

(1)

<mybatis:scan>factory-ref属性にjobSqlSessionFactoryを設定する。

ItemReaderにおけるデータベースアクセス

ここではItemReaderによるデータベースアクセスについて説明する。

MyBatisのItemReader

MyBatis-Springが提供するItemReaderとして下記の2つが存在する。

  • org.mybatis.spring.batch.MyBatisCursorItemReader

  • org.mybatis.spring.batch.MyBatisPagingItemReader

MyBatisPagingItemReaderは、TERASOLUNA Server 5.x 開発ガイドラインの Entityのページネーション検索(SQL絞り込み方式)で 説明している仕組みを利用したItemReaderである。
一定件数を取得した後に再度SQLを発行するため、データの一貫性が保たれない可能性がある。 そのため、バッチ処理で利用するには危険であることから、TERASOLUNA Batch 5.xでは原則使用しない。
TERASOLUNA Batch 5.xではMyBatisCursorItemReaderのみを利用する。

TERASOLUNA Batch 5.xでは、MyBatis-Springの設定で説明したとおり、 mybatis:scanによって動的にMapper XMLを登録する方法を採用している。 そのため、Mapper XMLに対応するインターフェースを用意する必要がある。 詳細については、TERASOLUNA Server 5.x 開発ガイドラインの データベースアクセス処理の実装を参照。

MyBatisCursorItemReaderの利用例を以下に示す。

META-INF/jobs/common/jobCustomerList01.xml
<!-- (1) -->
<mybatis:scan
    base-package="org.terasoluna.batch.functionaltest.app.repository.mst"
    factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="reader"
      class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
org/terasoluna/batch/functionaltest/app/repository/mst/CustomerRepository.xml
<!-- (5) -->
<mapper namespace="org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository">

    <!-- (6) -->
    <select id="findAll"
            resultType="org.terasoluna.batch.functionaltest.app.model.mst.Customer">
        <![CDATA[
        SELECT
            customer_id AS customerId,
            customer_name AS customerName,
            customer_address AS customerAddress,
            customer_tel AS customerTel,
            charge_branch_id AS chargeBranchId,
            create_date AS createDate,
            update_date AS updateDate
        FROM
            customer_mst
        ORDER by
            charge_branch_id ASC, customer_id ASC
        ]]>
    </select>

    <!-- omitted -->
</mapper>
org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository
public interface CustomerRepository {
    // (7)
    List<Customer> findAll();

    // omitted.
}
説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisCursorItemReaderを定義する。

(3)

queryIdのプロパティに、(6)で定義しているSQLのIDを(5)のnamespace + <メソッド名>で指定する。

(4)

sqlSessionFactory-refのプロパティに、アクセスするデータベースのSqlSessionFactoryを指定する。

(5)

Mapper XMLを定義する。namespaceの値とインターフェースのFQCNを一致させること。

(6)

SQLを定義する。

(7)

(6)で定義したSQLのIDに対応するメソッドをインターフェースに定義する。

ItemWriterにおけるデータベースアクセス

ここではItemWriterによるデータベースアクセスについて説明する。

MyBatisのItemWriter

MyBatis-Springが提供するItemWriterは以下の1つのみである。

  • org.mybatis.spring.batch.MyBatisBatchItemWriter

基本的な設定については、MyBatisのItemReaderと同じである。 MyBatisBatchItemWriterでは、MyBatisの設定で説明した batchModeSqlSessionTemplateを指定する必要がる。

MyBatisBatchItemWriterの定義例を以下に示す。

META-INF/jobs/common/jobSalesPlan01.xml
<!-- (1) -->
<mybatis:scan
    base-package="org.terasoluna.batch.functionaltest.app.repository.plan"
    factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="detailWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<!-- omitted -->
org/terasoluna/batch/functionaltest/app/repository/plan/SalesPlanDetailRepository.xml
<!-- (5) -->
<mapper namespace="org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">

    <!-- (6) -->
    <insert id="create"
            parameterType="org.terasoluna.batch.functionaltest.app.model.plan.SalesPlanDetail">
        <![CDATA[
        INSERT INTO
            sales_plan_detail(branch_id, year, month, customer_id, amount)
        VALUES (
            #{branchId}, #{year}, #{month}, #{customerId}, #{amount}
        )
        ]]>
    </insert>

    <!-- omitted -->
</mapper>
org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository
public interface SalesPlanDetailRepository {

    // (7)
    void create(SalesPlanDetail salesPlanDetail);

    // omitted.
}
説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisBatchItemWriterを定義する。

(3)

statementIdのプロパティに、(6)で定義しているSQLのIDを(5)のnamespace + <メソッド名>で指定する。

(4)

sqlSessionTemplate-refのプロパティに、アクセスするデータベースのSessionTemplateを指定する。
指定するSessionTemplateは、executorTypeBATCHに設定されていることが必須である。

(5)

Mapper XMLを定義する。namespaceの値とインターフェースのFQCNを一致させること。

(6)

SQLを定義する。

(7)

(6)で定義したSQLのIDに対応するメソッドをインターフェースに定義する。

ItemReader・ItemWriter以外のデータベースアクセス

ItemReader・ItemWriter以外のデータベースアクセスについて説明する。

ItemReader・ItemWriter以外でデータベースアクセスするには、Mapperインターフェースを利用する。 Mapperインターフェースを利用するにあたって、TERASOLUNA Batch 5.xでは以下の制約を設けている。

Mapperインターフェースの利用可能な箇所
処理 ItemProcessor Tasklet リスナー

参照

利用可

利用可

利用可

更新

条件付で利用可

利用可

利用不可

ItemProcessorでの制約

MyBatisには、同一トランザクション内で2つ以上のExecutorTypeで実行してはいけないという制約がある。
「ItemWriterにMyBatisBatchItemWriterを使用する」と「ItemProcessorでMapperインターフェースを使用し参照更新をする」を 同時に満たす場合は、この制約に抵触する。
制約を回避するには、ItemProcessorではExecutorTypeBATCHのMapperインターフェースによって データベースアクセスすることになる。
加えて、MyBatisBatchItemWriterではSQL実行後のステータスチェックにより、自身が発行したSQLかどうかチェックしているのだが、 当然ItemProcessorによるSQL実行は管理できないためエラーが発生してしまう。
よって、MyBatisBatchItemWriterを利用している場合は、Mapperインターフェースによる更新はできなくなり、参照のみとなる。

MyBatisBatchItemWriterのエラーチェックを無効化する設定ができるが、予期せぬ動作が起きる可能性があるため無効化は禁止する。

Taskletでの制約

Taskletでは、Mapperインターフェースを利用することが基本であるため、ItemProcessorのような影響はない。
MyBatisBatchItemWriterをInjectして利用することも考えられるが、その場合はMapperインターフェース自体を BATCH設定で処理すればよい。つまり、Taskletでは、MyBatisBatchItemWriterをInjectして使う必要は基本的にない。

リスナーでの制約

リスナーでもItemProcessorでの制約と同じ制約が成立する。 加えて、リスナーでは、更新を必要とするユースケースが考えにくい。よって、リスナーでは、更新系処理を禁止する。

リスナーで想定される更新処理の代替
ジョブの状態管理

Spring BatchのJobRepositoryによって行われている

データベースへのログ出力

ログのAppenderで実施すべき。ジョブのトランザクションとも別管理する必要がある。

ItemProcessorでのデータベースアクセス

ItemProcessorでのデータベースアクセス例を説明する。

ItemProcessorでの実装例
@Component
public class UpdateItemFromDBProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    // (1)
    @Inject
    CustomerRepository customerRepository;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {

        // (2)
        Customer customer = customerRepository.findOne(readItem.getCustomerId());

        // (3)
        SalesPlanDetail writeItem = new SalesPlanDetail();
        writeItem.setBranchId(customer.getChargeBranchId());
        writeItem.setYear(readItem.getYear());
        writeItem.setMonth(readItem.getMonth());
        writeItem.setCustomerId(readItem.getCustomerId());
        writeItem.setAmount(readItem.getAmount());
        return writeItem;
    }
}
Bean定義
<!-- (2) -->
<mybatis:scan
        base-package="org.terasoluna.batch.functionaltest.app.repository"
        template-ref="batchModeSqlSessionTemplate"/>

<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="org.terasoluna.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.findAll"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<!-- (3) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<batch:job id="DBAccessByItemProcessor" job-repository="jobRepository">
    <batch:step id="DBAccessByItemProcessor.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <!-- (4) -->
            <batch:chunk reader="reader"
                         processor="updateItemFromDBProcessor"
                         writer="writer" commit-interval="10"/>
        </batch:tasklet>
    </batch:step>
</batch:job>

MapperインターフェースとMapper XMLは省略する。

説明

項番

説明

(1)

MapperインターフェースをInjectする。

(2)

Mapper XMLの登録を行う。
template-ref属性にBATCH設定されているbatchModeSqlSessionTemplateを指定することで、 ItemProcessorでのデータベースアクセスはBATCHとなる。 ここで、factory-ref="jobSqlSessionFactory"としてしまうと、前述の制約に抵触し、 MyBatisBatchItemWriter実行時に例外が発生してしまう。

(3)

MyBatisBatchItemWriterを定義する。
sqlSessionTemplate-refプロパティにBATCH設定されているbatchModeSqlSessionTemplateを指定する。

(4)

MapperインターフェースをInjectしたItemProcessorを設定する。

MyBatisCursorItemReader設定の補足

以下に示す定義例のように、MyBatisCursorItemReaderとMyBatisBatchItemWriterで異なるExecutorTypeを使用しても問題ない。 これは、MyBatisCursorItemReaderによるリソースのオープンが、トランザクション開始前に行われているからである。

<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="xxx"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="yyy"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

Taskletでのデータベースアクセス

Taskletでのデータベースアクセス例を説明する。

Taskletでの実装例
@Component
public class OptimisticLockTasklet implements Tasklet {

    // (1)
    @Inject
    ExclusiveControlRepository repository;

    // omitted.

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        Branch branch = repository.branchFindOne(branchId); // (2)
        ExclusiveBranch exclusiveBranch = new ExclusiveBranch();

        exclusiveBranch.setBranchId(branch.getBranchId());
        exclusiveBranch.setBranchName(branch.getBranchName() + " - " + identifier);
        exclusiveBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
        exclusiveBranch.setBranchTel(branch.getBranchTel());
        exclusiveBranch.setCreateDate(branch.getUpdateDate());
        exclusiveBranch.setUpdateDate(new Timestamp(System.currentTimeMillis()));
        exclusiveBranch.setOldBranchName(branch.getBranchName());

        int result = repository.branchExclusiveUpdate(exclusiveBranch); // (3)

        return RepeatStatus.FINISHED;
    }
}
Bean定義
<!-- (4) -->
<mybatis:scan
        base-package="org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository"
        factory-ref="jobSqlSessionFactory"/>

<batch:job id="taskletOptimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="taskletOptimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager"
                       ref="optimisticLockTasklet"> <!-- (5) -->
        </batch:tasklet>
    </batch:step>
</batch:job>

MapperインターフェースとMapper XMLは省略する。

説明

項番

説明

(1)

MapperインターフェースをInjectする。

(2)

Mapperインターフェースで検索処理を実行する。

(3)

Mapperインターフェースで更新処理を実行する。

(4)

Mapper XMLの登録を行う。
factory-ref属性にREUSE設定されているjobSqlSessionFactoryを指定する。

(5)

MapperインターフェースをInjectしTaskletを設定する。

batchModeSqlSessionTemplateの利用

タスクレットモデルでの更新処理が多い場合は、factory-ref属性に`batchModeSqlSessionTemplateを設定する。 これにより、バッチ更新処理が行われるので、性能向上が期待できる。 ただし、バッチ更新の実行はflushを明示的に呼ぶ必要があるため、注意すること。 詳細は、 バッチモードのRepository利用時の注意点 を参照のこと。

リスナーでのデータベースアクセス

リスナーでのデータベースアクセスは他のコンポーネントと連携することが多い。 使用するリスナー及び実装方法によっては、Mapperインターフェースで取得したデータを、 他のコンポーネントへ引き渡す仕組みを追加で用意する必要がある。

ここでは一例として、StepExecutionListenerで ステップ実行前にデータを取得して、ItemProcessorで取得したデータを利用する例を示す。

リスナーでの実装例
public class CacheSetListener extends StepExecutionListenerSupport {

    // (1)
    @Inject
    CustomerRepository customerRepository;

    // (2)
    @Inject
    CustomerCache cache;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // (3)
        for(Customer customer : customerRepository.findAll()) {
            cache.addCustomer(customer.getCustomerId(), customer);
        }
    }
}
ItemProcessorでの利用例
@Component
public class UpdateItemFromCacheProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    // (4)
    @Inject
    CustomerCache cache;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
        Customer customer = cache.getCustomer(readItem.getCustomerId());  // (5)

        SalesPlanDetail writeItem = new SalesPlanDetail();

        // omitted.
        writerItem.setCustomerName(customer.getCustomerName); // (6)

        return writeItem;
    }
}
キャッシュクラス
// (7)
@Component
public class CustomerCache {

    Map<String, Customer> customerMap = new HashMap<>();

    public Customer getCustomer(String customerId) {
        return customerMap.get(customerId);
    }

    public void addCustomer(String id, Customer customer) {
        customerMap.put(id, customer);
    }
}
Bean定義
<!-- omitted -->

<!-- (8) -->
<mybatis:scan
        base-package="org.terasoluna.batch.functionaltest.app.repository"
        template-ref="batchModeSqlSessionTemplate"/>
<!-- (9) -->
<bean id="cacheSetListener"
      class="org.terasoluna.batch.functionaltest.ch05.dbaccess.CacheSetListener"/>

<!-- omitted -->

<batch:job id="DBAccessByItemListener" job-repository="jobRepository">
    <batch:step id="DBAccessByItemListener.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="updateItemFromCacheProcessor"
                         writer="writer" commit-interval="10"/> <!-- (10) -->
            <!-- (11) -->
            <batch:listeners>
                <batch:listener ref="cacheSetListener"/>
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
</batch:job>
説明

項番

説明

(1)

MapperインターフェースをInjectする。

(2)

Mapperインターフェースから取得したデータをキャッシュするためのBeanをInjectする。

(3)

リスナーにて、Mapperインターフェースからデータを取得してキャッシュする。
ここでは、StepExecutionListener#beforeStepにてステップ実行前にキャッシュを作成し、 以降の処理ではキャッシュを参照することで、I/Oを低減し処理効率を高めている。

(4)

(2)で設定したキャッシュと同じBeanをInjectする。

(5)

キャッシュから該当するデータを取得する。

(6)

更新データにキャッシュからのデータを反映する。

(7)

キャッシュクラスをコンポーネントとして実装する。
ここではBeanスコープはsingletonにしている。ジョブに応じて設定すること。

(8)

Mapper XMLの登録を行う。
template-ref属性にBATCHが設定されているbatchModeSqlSessionTemplateを指定する。

(9)

Mapperインターフェースを利用するリスナーを定義する。

(10)

キャッシュを利用するItemProcessorを指定する。

(11)

(9)で定義したリスナーを登録する。

リスナーでのSqlSessionFactoryの利用

上記の例では、batchModeSqlSessionTemplateを設定しているが、jobSqlSessionFactoryを設定してもよい。

チャンクのスコープ外で動作するリスナーについては、トランザクション外で処理されるため、 jobSqlSessionFactoryを設定しても問題ない。

How To Extend

CompositeItemWriterによる複数テーブルの更新

チャンクモデルで、1つの入力データに対して複数のテーブルへ更新を行いたい場合は、Spring Batchが提供するCompositeItemWriterを利用し、 各テーブルに対応したMyBatisBatchItemWriterを連結することで実現できる。

ここでは、売上計画と売上実績の2つのテーブルを更新する場合の実装例を示す。

ItemProcessorの実装例
@Component
public class SalesItemProcessor implements ItemProcessor<SalesPlanDetail, SalesDTO> {
    @Override
    public SalesDTO process(SalesPlanDetail item) throws Exception { // (1)

        SalesDTO salesDTO = new SalesDTO();

        // (2)
        SalesPerformanceDetail spd = new SalesPerformanceDetail();
        spd.setBranchId(item.getBranchId());
        spd.setYear(item.getYear());
        spd.setMonth(item.getMonth());
        spd.setCustomerId(item.getCustomerId());
        spd.setAmount(new BigDecimal(0L));
        salesDTO.setSalesPerformanceDetail(spd);

        // (3)
        item.setAmount(item.getAmount().add(new BigDecimal(1L)));
        salesDTO.setSalesPlanDetail(item);

        return salesDTO;
    }
}
DTOの実装例
public class SalesDTO implements Serializable {

    // (4)
    private SalesPlanDetail salesPlanDetail;

    // (5)
    private SalesPerformanceDetail salesPerformanceDetail;

    // omitted
}
MapperXMLの実装例
<mapper namespace="org.terasoluna.batch.functionaltest.ch05.dbaccess.repository.SalesRepository">

    <select id="findAll" resultType="org.terasoluna.batch.functionaltest.app.model.plan.SalesPlanDetail">
        <![CDATA[
        SELECT
            branch_id AS branchId, year, month, customer_id AS customerId, amount
        FROM
            sales_plan_detail
        ORDER BY
            branch_id ASC, year ASC, month ASC, customer_id ASC
        ]]>
    </select>

    <!-- (6) -->
    <update id="update" parameterType="org.terasoluna.batch.functionaltest.ch05.dbaccess.SalesDTO">
        <![CDATA[
        UPDATE
            sales_plan_detail
        SET
            amount = #{salesPlanDetail.amount}
        WHERE
            branch_id = #{salesPlanDetail.branchId}
        AND
            year = #{salesPlanDetail.year}
        AND
            month = #{salesPlanDetail.month}
        AND
            customer_id = #{salesPlanDetail.customerId}
        ]]>
    </update>

    <!-- (7) -->
    <insert id="create" parameterType="org.terasoluna.batch.functionaltest.ch05.dbaccess.SalesDTO">
        <![CDATA[
        INSERT INTO
            sales_performance_detail(
                branch_id,
                year,
                month,
                customer_id,
                amount
            )
        VALUES (
            #{salesPerformanceDetail.branchId},
            #{salesPerformanceDetail.year},
            #{salesPerformanceDetail.month},
            #{salesPerformanceDetail.customerId},
            #{salesPerformanceDetail.amount}
        )
        ]]>
    </insert>

</mapper>
CompositeItemWriterの適用例
<!-- reader using MyBatisCursorItemReader -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="org.terasoluna.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.findAll"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<!-- writer MyBatisBatchItemWriter -->
<!-- (8) -->
<bean id="planWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="org.terasoluna.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.update"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<!-- (9) -->
<bean id="performanceWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="org.terasoluna.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.create"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<!-- (10) -->
<bean id="writer" class="org.springframework.batch.item.support.CompositeItemWriter">
    <property name="delegates">
      <!-- (11)-->
        <list>
            <ref bean="performanceWriter"/>
            <ref bean="planWriter"/>
        </list>
    </property>
</bean>

<!-- (12) -->
<batch:job id="useCompositeItemWriter" job-repository="jobRepository">
    <batch:step id="useCompositeItemWriter.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="salesItemProcessor"
                         writer="writer" commit-interval="3"/>
        </batch:tasklet>
    </batch:step>
</batch:job>
説明

項番

説明

(1)

入力データに対して2つのテーブルを更新するための各エンティティを保持するDTOを出力とするItemProcessorを実装する。
ItemWriterには2つのテーブルを更新するために異なるオブジェクトを渡すことはできないため、更新に必要なオブジェクトを集約するDTOを定義している。

(2)

売上実績(SalesPerformanceDetail)を新規作成するためのエンティティを作成し、DTOに格納する。

(3)

入力データでもある売上計画(SalesPlanDetail)を更新するため、入力データを更新してDTOに格納する。

(4)

売上計画(SalesPlanDetail)を保持するようにDTOに定義する。

(5)

売上実績(SalesPerformanceDetail)を保持するようにDTOに定義する。

(6)

DTOから取得した売上計画(SalesPlanDetail)で、売上計画テーブル(sales_plan_detail)を更新するSQLを定義する。

(7)

DTOから取得した売上実績(SalesPlanDetail)で、売上実績テーブル(sales_performance_detail)を新規作成するSQLを定義する。

(8)

売上計画テーブル(sales_plan_detail)を更新するMyBatisBatchItemWriterを定義する。

(9)

売上実績テーブル(sales_performance_detail)を新規作成するMyBatisBatchItemWriterを定義する。

(10)

(9),(10)を順番に実行するためにCompositeItemWriterを定義する。

(11)

<list>タグ内に(9),(10)を設定する。指定した順番にItemWriterが実行される。

(12)

チャンクのwriter属性に(10)で定義したBeanを指定する。processor属性には(1)のItemProcessorを指定する。

複数データソースへの出力(1ステップ)で説明した org.springframework.data.transaction.ChainedTransactionManagerと同時に使用することで複数データソースに対しての更新もできる。

また、CompositeItemWriterは、ItemWriter実装であれば連結できるので、 MyBatisBatchItemWriterとFlatFileItemWriterを設定することで、データベース出力とファイル出力を同時に行うこともできる。

TERASOLUNA Batch Framework for Java (5.x) Development Guideline - version 5.0.1.RELEASE, 2017-9-27