しおしお

IntelliJのあれやこれや

JSR352-Batch Applicationを試してみた(Chunkステップ編)

JSR352-Batch Applicationを試してみた(Batchlet編) - しおしおの続編です。今回は、chunkステップを試してみます。

chunkステップは、ItemReaderでデータベースやファイルなどから読んだデータをItemProcessorで変換(ビジネスロジックで色々導出したりしてItemWriterで出力するオブジェクトを生成する感じかな)し、ItemWriterでデータベースやファイルに出力します。
writerに渡されるデータは、ジョブXMLのchunkステップで設定した件数分纏めて渡されてきます。
例えば、item-countに1000を指定した場合、readerとprocessorが1000回繰り返し実行されたあとに、1000件のデータがwriterに渡されます。*1

ジョブの構成

ItemReaderでは1から10までの連番をリードします。ItemProcessorでは、ItemReaderでリードしたデータ(連番)からUserEntityを生成して返します。ItemWriterでは、JPAを使用してUserEntityの情報をデータベースに保存しています。

ジョブXMLは以下の内容になっています。処理する件数が少ないのでChunkサイズ(item-count)も小さめに設定しています。

<job id="chunk-sample" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
  <step id="step">
    <chunk item-count="3">
      <reader ref="sampleReader"/>
      <processor ref="sampleProcessor" />
      <writer ref="userWriter"/>
    </chunk>
  </step>
</job>

ItemReaderの実装

ItemReaderは、AbstractItemReaderを継承して作成します。ItemReaderインタフェースを実装してもいいだけど、AbstractItemReaderは必須じゃないメソッドは空実装持っててくれるので不要なメソッドは実装しないで良い感じです。
ItemReaderの実装では、readItemメソッドで外部リソースから読み込んだ情報を返します。今回は単純にインクリメントした値を返しているだけです。
もし、これ以上読み取るデータがない場合には、nullを返却します。nullを返却するとこれ以上扱うレコードがないと判断され、最後の纏まりがItemWriterで処理されたあとにChunkステップが終了します。

今回の例では、外部リソースを必要としないので、リソースを開いたり閉じたりなどの処理は実装していません。

package siosio.chunk;

import javax.batch.api.chunk.AbstractItemReader;
import javax.enterprise.context.Dependent;
import javax.inject.Named;

@Dependent
@Named
public class SampleReader extends AbstractItemReader {

    int index = 0;

    @Override
    public Object readItem() throws Exception {
        index++;
        if (index <= 10) {
            return index;
        }
        // これ以上レコードがない場合はnullを返す
        return null;
    }
}

ItemProcessorの実装

ItemProcessorはItemProcessorインタフェースをimplementsして作成します。メソッドは1つだけなので、ItemReaderやWriterのように空実装をもった抽象クラスは提供されていません。

今回の例では、ItemReaderでリードした連番を、出力ようにEntityに変換しています。

package siosio.chunk;

import javax.batch.api.chunk.ItemProcessor;
import javax.enterprise.context.Dependent;
import javax.inject.Named;

import siosio.entity.UserEntity;

@Named
@Dependent
public class SampleProcessor implements ItemProcessor {

    @Override
    public Object processItem(Object item) throws Exception {
        return new UserEntity("user name " + item);
    }
}

ItemWriterの実装

ItemWriterはItemReaderと同じ理由でAbstractItemWriterを継承して作成します。

今回は、ItemProcessorで変換した値をJPAを使ってデータベースに保存しています。

package siosio.chunk;

import java.util.List;

import javax.batch.api.chunk.AbstractItemWriter;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@Dependent
@Named
public class UserWriter extends AbstractItemWriter {

    @PersistenceContext
    EntityManager em;

    @Override
    public void writeItems(List<Object> items) throws Exception {
        for (Object item : items) {
            em.persist(item);
        }
    }
}

実行ログ

hibernateのログです。ちゃんとインサートできてますね。

00:02:00,762 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,769 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,783 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,788 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,790 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 1]
00:02:00,790 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [21]
00:02:00,791 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,791 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 2]
00:02:00,791 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [22]
00:02:00,791 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,792 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 3]
00:02:00,795 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [23]
00:02:00,806 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,808 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,810 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,816 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,818 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 4]
00:02:00,819 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [24]
00:02:00,822 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,822 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 5]
00:02:00,822 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [25]
00:02:00,823 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,824 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 6]
00:02:00,824 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [26]
00:02:00,835 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,837 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,840 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,846 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,858 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 7]
00:02:00,858 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [27]
00:02:00,859 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,859 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 8]
00:02:00,859 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [28]
00:02:00,859 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,860 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 9]
00:02:00,860 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [29]
00:02:00,865 DEBUG [org.hibernate.SQL] (Batch Thread - 3) select nextval ('hibernate_sequence')
00:02:00,870 DEBUG [org.hibernate.SQL] (Batch Thread - 3) insert into users (name, id) values (?, ?)
00:02:00,871 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [1] as [VARCHAR] - [user name 10]
00:02:00,871 TRACE [org.hibernate.type.descriptor.sql.BasicBinder] (Batch Thread - 3) binding parameter [2] as [BIGINT] - [30]

一応テストを

JSR352-Batch Applicationを試してみた(Batchletをテストしてみた) - しおしおと同じようにArquillianを使ってテストしてみたのでテストコードを。。。

package siosio.chunk;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.util.List;

import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobExecution;
import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.UserTransaction;

import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.gradle.archive.importer.embedded.EmbeddedGradleImporter;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import siosio.TestHelper;
import siosio.entity.UserEntity;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(Arquillian.class)
public class SampleChunkTest {

    @Deployment
    public static WebArchive createDeployment() {
        return ShrinkWrap.create(EmbeddedGradleImporter.class)
                .forThisProjectDirectory()
                .importBuildOutput()
                .as(WebArchive.class)
                .addClasses(TestHelper.class);
    }

    @PersistenceContext
    EntityManager em;

    @Inject
    UserTransaction utx;

    @Before
    public void setUp() throws Exception {
        utx.begin();
        final Query query = em.createNativeQuery("delete from USERS");
        query.executeUpdate();
        utx.commit();
    }


    @Test
    public void testJobCompleted() throws Exception {
        final JobExecution jobExecution = TestHelper.start("chunk-sample");
        assertThat("正常終了していること", jobExecution.getBatchStatus(), is(BatchStatus.COMPLETED));

        final List<UserEntity> result = em.createNamedQuery("findUsersAll", UserEntity.class)
                .getResultList();

        assertThat("10レコード登録されていること", result.size(), is(10));

        for (int i = 0; i < 10; i++) {
            int index = i + 1;
            assertThat("ユーザ名", result.get(i)
                    .getName(), is("user name " + index));
        }
    }
}

おわり。

*1:ItemReaderやItemWriterの実装は、 jsr352/jberet-support/src/main/java/org/jberet/support/io at master · jberet/jsr352 · GitHubがかなり参考になります