しおしお

IntelliJ IDEAのことなんかを書いてます

Spring Batchのchunkステップを並列で実行してみた

Spring Batchのchunkステップの各処理を並列実行してみました。

並列実行するためのジョブ定義

Readerで1から100までの連番を生成して、Writerでtestテーブルにbatch insertするだけのシンプルなchunkステップを持つジョブを使って試します。 並列実行されていることを確認するためにWriterでitemsをログに出力しています。

@Configuration
class BatchConfiguration(
        private val jobBuilderFactory: JobBuilderFactory,
        private val stepBuilderFactory: StepBuilderFactory,
        private val jdbcTemplate: JdbcTemplate
) {
    
    private val logger = LoggerFactory.getLogger(BatchConfiguration::class.java)
    
    @Bean
    fun job(): Job {
        return jobBuilderFactory.get("job")
                .incrementer(RunIdIncrementer())
                .start(step())
                .build()
    }

    @Bean
    fun step(): Step {
        val input = (1..100).iterator()
        return stepBuilderFactory.get("step")
                .chunk<Int, Int>(10)
                .reader(fun(): Int? {
                    return if (input.hasNext()) {
                        input.nextInt()
                    } else {
                        null
                    }
                })
                .writer {
                    logger.info("write size: ${it.size}, items: ${it}")
                    jdbcTemplate.batchUpdate("insert into test (id) values (?)", object : BatchPreparedStatementSetter {
                        override fun setValues(ps: PreparedStatement, i: Int) {
                            ps.setInt(1, it[i])
                        }

                        override fun getBatchSize(): Int {
                            return it.size
                        }
                    })
                }
                .build()
    }
}

並列実行の構成をせずにJOBを実行した結果

並列実行の構成を行わなかった場合、Writerの処理がmainスレッドで行われていることがわかります。

2019-07-14 07:11:45.608  INFO 22744 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2019-07-14 07:11:45.623  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2019-07-14 07:11:45.633  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
2019-07-14 07:11:45.642  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
2019-07-14 07:11:45.648  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
2019-07-14 07:11:45.653  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
2019-07-14 07:11:45.659  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
2019-07-14 07:11:45.666  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
2019-07-14 07:11:45.672  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
2019-07-14 07:11:45.679  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
2019-07-14 07:11:45.685  INFO 22744 --- [           main] s.springbatchsample.BatchConfiguration   : write size: 10, items: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
2019-07-14 07:11:45.711  INFO 22744 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{run.id=59}] and the following status: [COMPLETED]

並列実行するための構成を追加

並列実行するためのTaskExecutorが必要となるためインジェクションします。 TaskExecutorは、org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfigurationが生成してくれるので、特にBeanを生成しなくても利用できます。

@Configuration
class BatchConfiguration(
        private val jobBuilderFactory: JobBuilderFactory,
        private val stepBuilderFactory: StepBuilderFactory,
        private val jdbcTemplate: JdbcTemplate,
        private val taskExecutor: TaskExecutor
) {

インジェクションしたTaskExecutorをステップ構築時に設定(taskExecutorメソッドに)します。 また、並列に実行されては困る部分は、適宜同期処理を入れる必要が有ります。今回のステップではReaderの処理が同時に実行されると同じ番号を返す必要があるため@Synchronizedを追加しています。

    @Bean
    fun step(): Step {
        val input = (1..100).iterator()
        return stepBuilderFactory.get("step")
                .chunk<Int, Int>(10)
                .reader(@Synchronized fun(): Int? {
                  // 省略
                })
                .writer {
                  // 省略
                }
                .taskExecutor(taskExecutor)
                .build()
    }

実行結果

Writerで出力しているログ内容から各タスクが並列実行(ログのスレッド名からスレッド数は8)されていることがわかります。
※デフォルトのスレッド数が8となっているので、この結果となります。

2019-07-14 07:40:51.342  INFO 24520 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2019-07-14 07:40:51.403  INFO 24520 --- [         task-2] s.springbatchsample.BatchConfiguration   : write size: 10, items: [1, 7, 11, 14, 17, 20, 24, 28, 32, 36]
2019-07-14 07:40:51.404  INFO 24520 --- [         task-3] s.springbatchsample.BatchConfiguration   : write size: 10, items: [3, 5, 21, 25, 29, 33, 37, 38, 39, 40]
2019-07-14 07:40:51.405  INFO 24520 --- [         task-1] s.springbatchsample.BatchConfiguration   : write size: 10, items: [2, 8, 10, 13, 16, 18, 22, 26, 31, 35]
2019-07-14 07:40:51.406  INFO 24520 --- [         task-4] s.springbatchsample.BatchConfiguration   : write size: 10, items: [4, 6, 9, 12, 15, 19, 23, 27, 30, 34]
2019-07-14 07:40:51.436  INFO 24520 --- [         task-5] s.springbatchsample.BatchConfiguration   : write size: 10, items: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
2019-07-14 07:40:51.445  INFO 24520 --- [         task-6] s.springbatchsample.BatchConfiguration   : write size: 10, items: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
2019-07-14 07:40:51.452  INFO 24520 --- [         task-7] s.springbatchsample.BatchConfiguration   : write size: 10, items: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
2019-07-14 07:40:51.460  INFO 24520 --- [         task-8] s.springbatchsample.BatchConfiguration   : write size: 10, items: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
2019-07-14 07:40:51.467  INFO 24520 --- [         task-2] s.springbatchsample.BatchConfiguration   : write size: 10, items: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
2019-07-14 07:40:51.473  INFO 24520 --- [         task-4] s.springbatchsample.BatchConfiguration   : write size: 10, items: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
2019-07-14 07:40:51.618  INFO 24520 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{run.id=63}] and the following status: [COMPLETED]

スレッド数を変更してみる

スレッド数は、application.propertiesで設定できます。 例えば、スレッド数を4としたい場合には、下のように設定します。

spring.task.execution.pool.core-size=4

実行結果

スレッド名が、task-1からtask-4までとなっていてスレッド数が4に制限されたことがわかります。

2019-07-14 07:48:49.039  INFO 24812 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2019-07-14 07:48:49.062  INFO 24812 --- [         task-1] s.springbatchsample.BatchConfiguration   : write size: 10, items: [3, 8, 11, 15, 19, 23, 27, 31, 35, 39]
2019-07-14 07:48:49.063  INFO 24812 --- [         task-2] s.springbatchsample.BatchConfiguration   : write size: 10, items: [1, 6, 10, 13, 17, 21, 25, 29, 34, 37]
2019-07-14 07:48:49.063  INFO 24812 --- [         task-4] s.springbatchsample.BatchConfiguration   : write size: 10, items: [2, 5, 9, 14, 18, 22, 26, 30, 33, 38]
2019-07-14 07:48:49.063  INFO 24812 --- [         task-3] s.springbatchsample.BatchConfiguration   : write size: 10, items: [4, 7, 12, 16, 20, 24, 28, 32, 36, 40]
2019-07-14 07:48:49.076  INFO 24812 --- [         task-4] s.springbatchsample.BatchConfiguration   : write size: 10, items: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
2019-07-14 07:48:49.083  INFO 24812 --- [         task-3] s.springbatchsample.BatchConfiguration   : write size: 10, items: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
2019-07-14 07:48:49.090  INFO 24812 --- [         task-2] s.springbatchsample.BatchConfiguration   : write size: 10, items: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
2019-07-14 07:48:49.096  INFO 24812 --- [         task-1] s.springbatchsample.BatchConfiguration   : write size: 10, items: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
2019-07-14 07:48:49.101  INFO 24812 --- [         task-4] s.springbatchsample.BatchConfiguration   : write size: 10, items: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
2019-07-14 07:48:49.107  INFO 24812 --- [         task-3] s.springbatchsample.BatchConfiguration   : write size: 10, items: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
2019-07-14 07:48:49.178  INFO 24812 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{run.id=64}] and the following status: [COMPLETED]

ステップで使用できるスレッド数を制限する

ステップ構築時に、throttleLimitを設定することで制限することができます。 デフォルトでは、スレッド数が8でthrottleLimitが4なので、スッテップでは同時に4スレッドのみが使われます。 多分ですが、Flowを使用して同時に複数のステップを実行するような構成とした場合に使うのかなと思います。

今回は、throttleLimitを2にしてWriterにスリープ処理を入れて制限がかかっていることを確認しやすくしています。

    @Bean
    fun step(): Step {
        val input = (1..100).iterator()
        return stepBuilderFactory.get("step")
                .chunk<Int, Int>(10)
                .reader(@Synchronized fun(): Int? {
                  // 省略
                })
                .writer {
                    logger.info("write size: ${it.size}, items: ${it}")
                    TimeUnit.SECONDS.sleep(5)
                    jdbcTemplate.batchUpdate("insert into test (id) values (?)", object : BatchPreparedStatementSetter {
                        override fun setValues(ps: PreparedStatement, i: Int) {
                            ps.setInt(1, it[i])
                        }

                        override fun getBatchSize(): Int {
                            return it.size
                        }
                    })
                }
                .taskExecutor(taskExecutor)
                .throttleLimit(2)
                .build()
    }

実行結果

ログから、2スレッド処理する毎に5秒間が開くのでthrottleLimitでスレッド数が制限されていることがわかります。

2019-07-14 08:07:10.815  INFO 26547 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2019-07-14 08:07:10.834  INFO 26547 --- [         task-1] s.springbatchsample.BatchConfiguration   : write size: 10, items: [1, 4, 6, 7, 9, 11, 13, 15, 17, 19]
2019-07-14 08:07:10.834  INFO 26547 --- [         task-2] s.springbatchsample.BatchConfiguration   : write size: 10, items: [2, 3, 5, 8, 10, 12, 14, 16, 18, 20]
2019-07-14 08:07:15.889  INFO 26547 --- [         task-3] s.springbatchsample.BatchConfiguration   : write size: 10, items: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
2019-07-14 08:07:15.898  INFO 26547 --- [         task-4] s.springbatchsample.BatchConfiguration   : write size: 10, items: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
2019-07-14 08:07:20.932  INFO 26547 --- [         task-5] s.springbatchsample.BatchConfiguration   : write size: 10, items: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
2019-07-14 08:07:20.947  INFO 26547 --- [         task-6] s.springbatchsample.BatchConfiguration   : write size: 10, items: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
2019-07-14 08:07:25.974  INFO 26547 --- [         task-7] s.springbatchsample.BatchConfiguration   : write size: 10, items: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
2019-07-14 08:07:25.990  INFO 26547 --- [         task-8] s.springbatchsample.BatchConfiguration   : write size: 10, items: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
2019-07-14 08:07:31.018  INFO 26547 --- [         task-2] s.springbatchsample.BatchConfiguration   : write size: 10, items: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
2019-07-14 08:07:31.037  INFO 26547 --- [         task-1] s.springbatchsample.BatchConfiguration   : write size: 10, items: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
2019-07-14 08:07:36.095  INFO 26547 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{run.id=71}] and the following status: [COMPLETED]