Overview
TERASOLUNA Batch 5.xでは、データベースアクセスの方法として、MyBatis3(以降、「MyBatis」と呼ぶ)を利用する。 MyBatisによるデータベースアクセスの基本的な利用方法は、TERASOLUNA Server 5.x 開発ガイドラインの以下を参照してほしい。
本節では、TERASOLUNA Batch 5.x特有の使い方を中心に説明する。
本機能は、チャンクモデルとタスクレットモデルとで使い方が異なるため、それぞれについて説明する。
How to use
TERASOLUNA Batch 5.xでのデータベースアクセス方法を説明する。
TERASOLUNA Batch 5.xでのデータベースアクセスは、以下の2つの方法がある。
これらはデータベースアクセスするコンポーネントによって使い分ける。
-
MyBatis用のItemReaderおよびItemReaderを利用する。
-
チャンクモデルでのデータベースアクセスによる入出力で使用する。
-
org.mybatis.spring.batch.MyBatisCursorItemReader
-
org.mybatis.spring.batch.MyBatisBatchItemWriter
-
-
-
Mapperインターフェースを利用する
-
チャンクモデルでのビジネスロジック処理で使用する。
-
ItemProcessor実装で利用する。
-
-
タスクレットモデルでのデータベースアクセス全般で使用する。
-
Tasklet実装で利用する。
-
-
共通設定
データベースアクセスにおいて必要な共通設定について説明を行う。
データソースの設定
TERASOLUNA Batch 5.xでは、2つのデータソースを前提としている。
launch-context.xml
でデフォルト設定している2つのデータソースを示す。
データソース名 | 説明 |
---|---|
|
Spring BatchやTERASOLUNA Batch 5.xが利用するデータソース |
|
ジョブが利用するデータソース |
以下に、launch-context.xmlと接続情報のプロパティを示す。
これらをユーザの環境に合わせて設定すること。
<!-- (1) -->
<bean id="adminDataSource" class="org.apache.commons.dbcp2.BasicDataSource"
destroy-method="close"
p:driverClassName="${admin.jdbc.driver}"
p:url="${admin.jdbc.url}"
p:username="${admin.jdbc.username}"
p:password="${admin.jdbc.password}"
p:maxTotal="10"
p:minIdle="1"
p:maxWaitMillis="5000"
p:defaultAutoCommit="false"/>
<!-- (2) -->
<bean id="jobDataSource" class="org.apache.commons.dbcp2.BasicDataSource"
destroy-method="close"
p:driverClassName="${jdbc.driver}"
p:url="${jdbc.url}"
p:username="${jdbc.username}"
p:password="${jdbc.password}"
p:maxTotal="10"
p:minIdle="1"
p:maxWaitMillis="5000"
p:defaultAutoCommit="false" />
# (3)
# Admin DataSource settings.
admin.h2.jdbc.driver=org.h2.Driver
admin.h2.jdbc.url=jdbc:h2:mem:batch;DB_CLOSE_DELAY=-1
admin.h2.jdbc.username=sa
admin.h2.jdbc.password=
# (4)
# Job DataSource settings.
jdbc.driver=org.postgresql.Driver
jdbc.url=jdbc:postgresql://localhost:5432/postgres
jdbc.username=postgres
jdbc.password=postgres
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
|
(4) |
|
MyBatisの設定
TERASOLUNA Batch 5.xで、MyBatisの設定をする上で重要な点について説明をする。
バッチ処理を実装する際の重要なポイントの1つとして「大量のデータを一定のリソースで効率よく処理する」が挙げられる。
これに関する設定を説明する。
-
fetchSize
-
一般的なバッチ処理では、大量のデータを処理する際の通信コストを低減するために、 JDBCドライバに適切な
fetchSize
を指定することが必須である。fetchSize
とは、JDBCドライバとデータベース間とで1回の通信で取得するデータ件数を設定するパラメータである。 この値は出来る限り大きい値を設定することが望ましいが、大きすぎるとメモリを圧迫するため、注意が必要である。 ユーザにてチューニングする必要がある箇所と言える。 -
MyBatisでは、全クエリ共通の設定として
defaultFetchSize
を設定することができ、さらにクエリごとのfetchSize
設定で上書きできる。
-
-
executorType
-
一般的なバッチ処理では、同一トランザクション内で同じSQLを
全データ件数/fetchSize
の回数分実行することになる。 この際、都度ステートメントを作成するのではなく再利用することで効率よく処理できる。 -
MyBatisの設定における、
defaultExecutorType
にREUSE
を設定することでステートメントの再利用ができ、 処理スループット向上に寄与する。 -
大量のデータを一度に更新する場合、JDBCのバッチ更新を利用することで性能向上が期待できる。
そのため、MyBatisBatchItemWriter
で利用するSqlSessionTemplate
には、
executorType
に(REUSE
ではなく)BATCH
が設定されている。
-
TERASOLUNA Batch 5.xでは、同時に2つの異なるExecutorType
が存在する。
一方のExecutorType
で実装する場合が多いと想定するが、併用時は特に注意が必要である。
この点は、ItemReader・ItemWriter以外のデータベースアクセスにて詳しく説明する。
MyBatisのその他のパラメータ
その他のパラメータに関しては以下リンクを参照し、 アプリケーションの特性にあった設定を行うこと。 |
以下にデフォルト提供されている設定を示す。
<bean id="jobSqlSessionFactory"
class="org.mybatis.spring.SqlSessionFactoryBean"
p:dataSource-ref="jobDataSource">
<!-- (1) -->
<property name="configuration">
<bean class="org.apache.ibatis.session.Configuration"
p:localCacheScope="STATEMENT"
p:lazyLoadingEnabled="true"
p:aggressiveLazyLoading="false"
p:defaultFetchSize="1000"
p:defaultExecutorType="REUSE"/>
</property>
</bean>
<!-- (2) -->
<bean id="batchModeSqlSessionTemplate"
class="org.mybatis.spring.SqlSessionTemplate"
c:sqlSessionFactory-ref="jobSqlSessionFactory"
c:executorType="BATCH"/>
項番 | 説明 |
---|---|
(1) |
MyBatisの各種設定を行う。 |
(2) |
|
adminDataSourceを利用したSqlSessionFactoryの定義箇所について
同期実行をする場合は、adminDataSourceを利用した META-INF/spring/async-batch-daemon.xml
|
Mapper XMLの定義
TERASOLUNA Batch 5.x特有の説明事項はないので、TERASOLUNA Server 5.x 開発ガイドラインの データベースアクセス処理の実装を参照してほしい。
MyBatis-Springの設定
MyBatis-Springが提供するItemReaderおよびItemWriterを使用する場合、MapperのConfigで使用するMapper XMLを設定する必要がある。
設定方法としては、以下の2つが考えられる。
-
共通設定として、すべてのジョブで使用するMapper XMLを登録する。
-
META-INF/spring/launch-context.xml
にすべてのMapper XMLを記述することになる。
-
-
個別設定として、ジョブ単位で利用するMapper XMLを登録する。
-
META-INF/jobs/
配下のBean定義に、個々のジョブごとに必要なMapper XMLを記述することになる。
-
共通設定をしてしまうと、同期実行をする際に実行するジョブのMapper XMLだけでなく、その他のジョブが使用するMapper XMLも読み込んでしまうために以下に示す弊害が生じる。
-
ジョブの起動までに時間がかかる
-
メモリリソースの消費が大きくなる
これを回避するために、TERASOLUNA Batch 5.xでは、個別設定として、個々のジョブ定義でそのジョブが必要とするMapper XMLだけを指定する設定方法を採用する。
基本的な設定方法については、TERASOLUNA Server 5.x 開発ガイドラインの MyBatis-Springの設定を参照してほしい。
TERASOLUNA Batch 5.xでは、複数のSqlSessionFactory
およびSqlSessionTemplate
が定義されているため、
どれを利用するか明示的に指定する必要がある。
基本的にはjobSqlSessionFactory
を指定すればよい。
以下に設定例を示す。
<!-- (1) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.app.repository.mst"
factory-ref="jobSqlSessionFactory"/>
項番 | 説明 |
---|---|
(1) |
|
ItemReaderにおけるデータベースアクセス
ここではItemReaderによるデータベースアクセスについて説明する。
MyBatisのItemReader
MyBatis-Springが提供するItemReaderとして下記の2つが存在する。
-
org.mybatis.spring.batch.MyBatisCursorItemReader
-
org.mybatis.spring.batch.MyBatisPagingItemReader
MyBatisPagingItemReader
は、TERASOLUNA Server 5.x 開発ガイドラインの
Entityのページネーション検索(SQL絞り込み方式)で
説明している仕組みを利用したItemReaderである。
一定件数を取得した後に再度SQLを発行するため、データの一貫性が保たれない可能性がある。
そのため、バッチ処理で利用するには危険であることから、TERASOLUNA Batch 5.xでは原則使用しない。
TERASOLUNA Batch 5.xではMyBatisCursorItemReader
のみを利用する。
TERASOLUNA Batch 5.xでは、MyBatis-Springの設定で説明したとおり、
mybatis:scan
によって動的にMapper XMLを登録する方法を採用している。
そのため、Mapper XMLに対応するインターフェースを用意する必要がある。
詳細については、TERASOLUNA Server 5.x 開発ガイドラインの
データベースアクセス処理の実装を参照。
MyBatisCursorItemReader
の利用例を以下に示す。
<!-- (1) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.app.repository.mst"
factory-ref="jobSqlSessionFactory"/>
<!-- (2) (3) (4) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<!-- (5) -->
<mapper namespace="org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository">
<!-- (6) -->
<select id="findAll"
resultType="org.terasoluna.batch.functionaltest.app.model.mst.Customer">
<![CDATA[
SELECT
customer_id AS customerId,
customer_name AS customerName,
customer_address AS customerAddress,
customer_tel AS customerTel,
charge_branch_id AS chargeBranchId,
create_date AS createDate,
update_date AS updateDate
FROM
customer_mst
ORDER by
charge_branch_id ASC, customer_id ASC
]]>
</select>
<!-- omitted -->
</mapper>
public interface CustomerRepository {
// (7)
List<Customer> findAll();
// omitted
}
項番 | 説明 |
---|---|
(1) |
Mapper XMLの登録を行う。 |
(2) |
|
(3) |
|
(4) |
|
(5) |
Mapper XMLを定義する。namespaceの値とインターフェースのFQCNを一致させること。 |
(6) |
SQLを定義する。 |
(7) |
(6)で定義したSQLのIDに対応するメソッドをインターフェースに定義する。 |
ItemWriterにおけるデータベースアクセス
ここではItemWriterによるデータベースアクセスについて説明する。
MyBatisのItemWriter
MyBatis-Springが提供するItemWriterは以下の1つのみである。
-
org.mybatis.spring.batch.MyBatisBatchItemWriter
基本的な設定については、MyBatisのItemReaderと同じである。
MyBatisBatchItemWriter
では、MyBatisの設定で説明した
batchModeSqlSessionTemplate
を指定する必要がる。
MyBatisBatchItemWriter
の定義例を以下に示す。
<!-- (1) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.app.repository.plan"
factory-ref="jobSqlSessionFactory"/>
<!-- (2) (3) (4) -->
<bean id="detailWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
p:statementId="org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create"
p:sqlSessionTemplate="batchModeSqlSessionTemplate"/>
<!-- omitted -->
<!-- (5) -->
<mapper namespace="org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">
<!-- (6) -->
<insert id="create"
parameterType="org.terasoluna.batch.functionaltest.app.model.plan.SalesPlanDetail">
<![CDATA[
INSERT INTO
sales_plan_detail(branch_id, year, month, customer_id, amount)
VALUES (
#{branchId}, #{year}, #{month}, #{customerId}, #{amount}
)
]]>
</insert>
<!-- omitted -->
</mapper>
public interface SalesPlanDetailRepository {
// (7)
void create(SalesPlanDetail salesPlanDetail);
// omitted
}
項番 | 説明 |
---|---|
(1) |
Mapper XMLの登録を行う。 |
(2) |
|
(3) |
|
(4) |
|
(5) |
Mapper XMLを定義する。namespaceの値とインターフェースのFQCNを一致させること。 |
(6) |
SQLを定義する。 |
(7) |
(6)で定義したSQLのIDに対応するメソッドをインターフェースに定義する。 |
ItemReader・ItemWriter以外のデータベースアクセス
ItemReader・ItemWriter以外のデータベースアクセスについて説明する。
ItemReader・ItemWriter以外でデータベースアクセスするには、Mapperインターフェースを利用する。 Mapperインターフェースを利用するにあたって、TERASOLUNA Batch 5.xでは以下の制約を設けている。
処理 | ItemProcessor | Tasklet | リスナー |
---|---|---|---|
参照 |
利用可 |
利用可 |
利用可 |
更新 |
条件付で利用可 |
利用可 |
利用不可 |
- ItemProcessorでの制約
-
MyBatisには、同一トランザクション内で2つ以上の
ExecutorType
で実行してはいけないという制約がある。
「ItemWriterにMyBatisBatchItemWriter
を使用する」と「ItemProcessorでMapperインターフェースを使用し参照更新をする」を 同時に満たす場合は、この制約に抵触する。
制約を回避するには、ItemProcessorではExecutorType
がBATCH
のMapperインターフェースによって データベースアクセスすることになる。
加えて、MyBatisBatchItemWriter
ではSQL実行後のステータスチェックにより、自身が発行したSQLかどうかチェックしているのだが、 当然ItemProcessorによるSQL実行は管理できないためエラーが発生してしまう。
よって、MyBatisBatchItemWriter
を利用している場合は、Mapperインターフェースによる更新はできなくなり、参照のみとなる。
|
- Taskletでの制約
-
Taskletでは、Mapperインターフェースを利用することが基本であるため、ItemProcessorのような影響はない。
MyBatisBatchItemWriter
をInjectして利用することも考えられるが、その場合はMapperインターフェース自体をBATCH
設定で処理すればよい。つまり、Taskletでは、MyBatisBatchItemWriter
をInjectして使う必要は基本的にない。 - リスナーでの制約
-
リスナーでもItemProcessorでの制約と同じ制約が成立する。 加えて、リスナーでは、更新を必要とするユースケースが考えにくい。よって、リスナーでは、更新系処理を禁止する。
リスナーで想定される更新処理の代替
|
ItemProcessorでのデータベースアクセス
ItemProcessorでのデータベースアクセス例を説明する。
@Component
public class UpdateItemFromDBProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {
// (1)
@Inject
CustomerRepository customerRepository;
@Override
public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
// (2)
Customer customer = customerRepository.findOne(readItem.getCustomerId());
// (3)
SalesPlanDetail writeItem = new SalesPlanDetail();
writeItem.setBranchId(customer.getChargeBranchId());
writeItem.setYear(readItem.getYear());
writeItem.setMonth(readItem.getMonth());
writeItem.setCustomerId(readItem.getCustomerId());
writeItem.setAmount(readItem.getAmount());
return writeItem;
}
}
<!-- (2) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.app.repository"
template-ref="batchModeSqlSessionTemplate"/>
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
p:queryId"org.terasoluna.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.findAll"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<!-- (3) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
p:statementId="org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>
<batch:job id="DBAccessByItemProcessor" job-repository="jobRepository">
<batch:step id="DBAccessByItemProcessor.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<!-- (4) -->
<batch:chunk reader="reader"
processor="updateItemFromDBProcessor"
writer="writer" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
MapperインターフェースとMapper XMLは省略する。
項番 |
説明 |
(1) |
MapperインターフェースをInjectする。 |
(2) |
Mapper XMLの登録を行う。 |
(3) |
|
(4) |
MapperインターフェースをInjectしたItemProcessorを設定する。 |
MyBatisCursorItemReader設定の補足
以下に示す定義例のように、MyBatisCursorItemReaderとMyBatisBatchItemWriterで異なる
|
Taskletでのデータベースアクセス
Taskletでのデータベースアクセス例を説明する。
@Component
public class OptimisticLockTasklet implements Tasklet {
// (1)
@Inject
ExclusiveControlRepository repository;
// omitted
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
Branch branch = repository.branchFindOne(branchId); // (2)
ExclusiveBranch exclusiveBranch = new ExclusiveBranch();
exclusiveBranch.setBranchId(branch.getBranchId());
exclusiveBranch.setBranchName(branch.getBranchName() + " - " + identifier);
exclusiveBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
exclusiveBranch.setBranchTel(branch.getBranchTel());
exclusiveBranch.setCreateDate(branch.getUpdateDate());
exclusiveBranch.setUpdateDate(new Timestamp(System.currentTimeMillis()));
exclusiveBranch.setOldBranchName(branch.getBranchName());
int result = repository.branchExclusiveUpdate(exclusiveBranch); // (3)
return RepeatStatus.FINISHED;
}
}
<!-- (4) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository"
factory-ref="jobSqlSessionFactory"/>
<batch:job id="taskletOptimisticLockCheckJob" job-repository="jobRepository">
<batch:step id="taskletOptimisticLockCheckJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager"
ref="optimisticLockTasklet"> <!-- (5) -->
</batch:tasklet>
</batch:step>
</batch:job>
MapperインターフェースとMapper XMLは省略する。
項番 |
説明 |
(1) |
MapperインターフェースをInjectする。 |
(2) |
Mapperインターフェースで検索処理を実行する。 |
(3) |
Mapperインターフェースで更新処理を実行する。 |
(4) |
Mapper XMLの登録を行う。 |
(5) |
MapperインターフェースをInjectしTaskletを設定する。 |
batchModeSqlSessionTemplateの利用
タスクレットモデルでの更新処理が多い場合は、 |
リスナーでのデータベースアクセス
リスナーでのデータベースアクセスは他のコンポーネントと連携することが多い。 使用するリスナー及び実装方法によっては、Mapperインターフェースで取得したデータを、 他のコンポーネントへ引き渡す仕組みを追加で用意する必要がある。
ここでは一例として、StepExecutionListenerで ステップ実行前にデータを取得して、ItemProcessorで取得したデータを利用する例を示す。
public class CacheSetListener extends StepExecutionListenerSupport {
// (1)
@Inject
CustomerRepository customerRepository;
// (2)
@Inject
CustomerCache cache;
@Override
public void beforeStep(StepExecution stepExecution) {
// (3)
customerRepository.findAll().forEach(customer ->
cache.addCustomer(customer.getCustomerId(), customer));
}
}
@Component
public class UpdateItemFromCacheProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {
// (4)
@Inject
CustomerCache cache;
@Override
public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
Customer customer = cache.getCustomer(readItem.getCustomerId()); // (5)
SalesPlanDetail writeItem = new SalesPlanDetail();
// omitted
writerItem.setCustomerName(customer.getCustomerName); // (6)
return writeItem;
}
}
// (7)
@Component
public class CustomerCache {
Map<String, Customer> customerMap = new HashMap<>();
public Customer getCustomer(String customerId) {
return customerMap.get(customerId);
}
public void addCustomer(String id, Customer customer) {
customerMap.put(id, customer);
}
}
<!-- omitted -->
<!-- (8) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.app.repository"
template-ref="batchModeSqlSessionTemplate"/>
<!-- (9) -->
<bean id="cacheSetListener"
class="org.terasoluna.batch.functionaltest.ch05.dbaccess.CacheSetListener"/>
<!-- omitted -->
<batch:job id="DBAccessByItemListener" job-repository="jobRepository">
<batch:step id="DBAccessByItemListener.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader"
processor="updateItemFromCacheProcessor"
writer="writer" commit-interval="10"/> <!-- (10) -->
<!-- (11) -->
<batch:listeners>
<batch:listener ref="cacheSetListener"/>
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:job>
項番 |
説明 |
(1) |
MapperインターフェースをInjectする。 |
(2) |
Mapperインターフェースから取得したデータをキャッシュするためのBeanをInjectする。 |
(3) |
リスナーにて、Mapperインターフェースからデータを取得してキャッシュする。 |
(4) |
(2)で設定したキャッシュと同じBeanをInjectする。 |
(5) |
キャッシュから該当するデータを取得する。 |
(6) |
更新データにキャッシュからのデータを反映する。 |
(7) |
キャッシュクラスをコンポーネントとして実装する。 |
(8) |
Mapper XMLの登録を行う。 |
(9) |
Mapperインターフェースを利用するリスナーを定義する。 |
(10) |
キャッシュを利用するItemProcessorを指定する。 |
(11) |
(9)で定義したリスナーを登録する。 |
リスナーでのSqlSessionFactoryの利用
上記の例では、 チャンクのスコープ外で動作するリスナーについては、トランザクション外で処理されるため、
|