Overview
チャンクモデルジョブの作成方法について説明する。 チャンクモデルのアーキテクチャについては、Spring Batchのアーキテクチャを参照のこと。
ここでは、チャンクモデルジョブの構成要素について説明する。
構成要素
チャンクモデルジョブの構成要素を以下に示す。 これらの構成要素をBean定義にて組み合わせることで1つのジョブを実現する。
項番 | 名称 | 役割 | 設定必須 | 実装必須 |
---|---|---|---|---|
1 |
ItemReader |
様々なリソースからデータを取得するためのインターフェース。 |
||
2 |
ItemProcessor |
入力から出力へデータを加工するためのインターフェース。 |
||
3 |
ItemWriter |
様々なリソースへデータを出力するためのインターフェース。 |
この表のポイントは以下である。
-
入力リソースから出力リソースへ単純にデータを移し替えるだけであれば、設定のみで実現できる。
-
ItemProcessor
は、必要が生じた際にのみ実装すればよい。
以降、これらの構成要素を用いたジョブの実装方法について説明する。
How to use
ここでは、実際にチャンクモデルジョブを実装する方法について、以下の順序で説明する。
ジョブの設定
Bean定義ファイルにて、チャンクモデルジョブを構成する要素の組み合わせ方を定義する。 以下に例を示し、構成要素の繋がりを説明する。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">
<!-- (1) -->
<import resource="classpath:META-INF/spring/job-base-context.xml"/>
<!-- (2) -->
<context:component-scan
base-package="org.terasoluna.batch.functionaltest.app.common" />
<!-- (3) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.app.repository.mst"
factory-ref="jobSqlSessionFactory"/>
<!-- (4) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<!-- (5) -->
<!-- Item Processor -->
<!-- Item Processor in order that based on the Bean defined by the annotations, not defined here -->
<!-- (6) -->
<bean id="writer"
class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step"
p:resource="file:#{jobParameters['outputFile']}">
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
p:names="customerId,customerName,customerAddress,customerTel,chargeBranchId"/>
</property>
</bean>
</property>
</bean>
<!-- (7) -->
<batch:job id="jobCustomerList01" job-repository="jobRepository"> <!-- (8) -->
<batch:step id="jobCustomerList01.step01"> <!-- (9) -->
<batch:tasklet transaction-manager="jobTransactionManager"> <!-- (10) -->
<batch:chunk reader="reader"
processor="processor"
writer="writer"
commit-interval="10" /> <!-- (11) -->
</batch:tasklet>
</batch:step>
</batch:job>
</beans>
@Component("processor") // (5)
public class CustomerProcessor implements ItemProcessor<Customer, Customer> {
// omitted.
}
項番 | 説明 |
---|---|
(1) |
TERASOLUNA Batch 5.xを利用する際に、常に必要なBean定義を読み込む設定をインポートする。 |
(2) |
コンポーネントスキャン対象のベースパッケージを設定する。 |
(3) |
MyBatis-Springの設定。 |
(4) |
ItemReaderの設定。 |
(5) |
ItemProcessorは、(2)によりアノテーションにて定義することができ、Bean定義ファイルで定義する必要がない。 |
(6) |
ItemWriterの設定。 |
(7) |
ジョブの設定。 |
(8) |
|
(9) |
ステップの設定。 |
(10) |
タスクレットの設定。 |
(11) |
チャンクモデルジョブの設定。 |
commit-intervalのチューニング
前述の例では10件としているが、利用できるマシンリソースやジョブの特性によって適切な件数は異なる。 複数のリソースにアクセスしてデータを加工するジョブであれば10件から100件程度で処理スループットが頭打ちになることもある。 一方、入出力リソースが1:1対応しておりデータを移し替える程度のジョブであれば5000件や10000件でも処理スループットが伸びることがある。 ジョブ実装時の |
コンポーネントの実装
ここでは主に、ItemProcessorを実装する方法について説明する。
他のコンポーネントについては、以下を参照のこと。
-
ItemReader、ItemWriter
-
Listener
ItemProcessorの実装
ItemProcessorの実装方法を説明する。
ItemProcessorは、以下のインターフェースが示すとおり、入力リソースから取得したデータ1件を元に、 出力リソースに向けたデータ1件を作成する役目を担う。 つまり、ItemProcessorはデータ1件に対するビジネスロジックを実装する箇所、と言える。
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
なお、インターフェースが示すI
とO
は以下のとおり同じ型でも異なる型でもよい。
同じ型であれば入力データを一部修正することを意味し、
異なる型であれば入力データを元に出力データを生成することを意味する。
@Component
public class AmountUpdateItemProcessor implements
ItemProcessor<SalesPlanDetail, SalesPlanDetail> {
@Override
public SalesPlanDetail process(SalesPlanDetail item) throws Exception {
item.setAmount(new BigDecimal("1000"));
return item;
}
}
@Component
public class UpdateItemFromDBProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {
@Inject
CustomerRepository customerRepository;
@Override
public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
Customer customer = customerRepository.findOne(readItem.getCustomerId());
SalesPlanDetail writeItem = new SalesPlanDetail();
writeItem.setBranchId(customer.getChargeBranchId());
writeItem.setYear(readItem.getYear());
writeItem.setMonth(readItem.getMonth());
writeItem.setCustomerId(readItem.getCustomerId());
writeItem.setAmount(readItem.getAmount());
return writeItem;
}
}
ItemProcessorからnullを返却することの意味
ItemProcessorからnullを返却することは、当該データを後続処理(Writer)に渡さないことを意味し、 言い換えるとデータをフィルタすることになる。 これは、入力データの妥当性検証を実施する上で有効活用できる。 詳細については、入力チェックを参照のこと。 |
ItemProcessorの処理スループットをあげるには
前述した実装例のように、ItemProcessorの実装クラスではデータベースやファイルを始めとしたリソースにアクセスしなければならないことがある。 ItemProcessorは入力データ1件ごとに実行されるため、I/Oが少しでも発生するとジョブ全体では大量のI/Oが発生することになる。 そのため、極力I/Oを抑えることが処理スループットをあげる上で重要となる。 1つの方法として、後述のListenerを活用することで事前に必要なデータをメモリ上に確保しておき、 ItemProcessorにおける処理の大半を、CPU/メモリ間で完結するように実装する手段がある。 ただし、1ジョブあたりのメモリを大量に消費することにも繋がるので、何でもメモリ上に確保すればよいわけではない。 I/O回数やデータサイズを元に、メモリに格納するデータを検討すること。 この点については、データの入出力でも紹介する。 |
複数のItemProcessorを同時に利用する
汎用的なItemProcessorを用意し、個々のジョブに適用したい場合は、
Spring Batchが提供する CompositeItemProcessorによる複数ItemProcessorの連結
delegates属性に指定した順番に処理されることに留意すること。 |