Spring Batchのchunkステップの各処理を並列実行してみました。
並列実行するためのジョブ定義
Readerで1から100までの連番を生成して、Writerでtestテーブルにbatch insertするだけのシンプルなchunkステップを持つジョブを使って試します。
並列実行されていることを確認するためにWriterでitemsをログに出力しています。
@Configuration
class BatchConfiguration(
private val jobBuilderFactory: JobBuilderFactory,
private val stepBuilderFactory: StepBuilderFactory,
private val jdbcTemplate: JdbcTemplate
) {
private val logger = LoggerFactory.getLogger(BatchConfiguration::class.java)
@Bean
fun job(): Job {
return jobBuilderFactory.get("job")
.incrementer(RunIdIncrementer())
.start(step())
.build()
}
@Bean
fun step(): Step {
val input = (1..100).iterator()
return stepBuilderFactory.get("step")
.chunk<Int, Int>(10)
.reader(fun(): Int? {
return if (input.hasNext()) {
input.nextInt()
} else {
null
}
})
.writer {
logger.info("write size: ${it.size}, items: ${it}")
jdbcTemplate.batchUpdate("insert into test (id) values (?)", object : BatchPreparedStatementSetter {
override fun setValues(ps: PreparedStatement, i: Int) {
ps.setInt(1, it[i])
}
override fun getBatchSize(): Int {
return it.size
}
})
}
.build()
}
}
並列実行の構成をせずにJOBを実行した結果
並列実行の構成を行わなかった場合、Writerの処理がmainスレッドで行われていることがわかります。
2019-07-14 07:11:45.608 INFO 22744 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
2019-07-14 07:11:45.623 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2019-07-14 07:11:45.633 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
2019-07-14 07:11:45.642 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
2019-07-14 07:11:45.648 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
2019-07-14 07:11:45.653 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
2019-07-14 07:11:45.659 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
2019-07-14 07:11:45.666 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
2019-07-14 07:11:45.672 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
2019-07-14 07:11:45.679 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
2019-07-14 07:11:45.685 INFO 22744 --- [ main] s.springbatchsample.BatchConfiguration : write size: 10, items: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
2019-07-14 07:11:45.711 INFO 22744 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{run.id=59}] and the following status: [COMPLETED]
並列実行するための構成を追加
並列実行するためのTaskExecutor
が必要となるためインジェクションします。
TaskExecutor
は、org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
が生成してくれるので、特にBeanを生成しなくても利用できます。
@Configuration
class BatchConfiguration(
private val jobBuilderFactory: JobBuilderFactory,
private val stepBuilderFactory: StepBuilderFactory,
private val jdbcTemplate: JdbcTemplate,
private val taskExecutor: TaskExecutor
) {
インジェクションしたTaskExecutor
をステップ構築時に設定(taskExecutor
メソッドに)します。
また、並列に実行されては困る部分は、適宜同期処理を入れる必要が有ります。今回のステップではReaderの処理が同時に実行されると同じ番号を返す必要があるため@Synchronized
を追加しています。
@Bean
fun step(): Step {
val input = (1..100).iterator()
return stepBuilderFactory.get("step")
.chunk<Int, Int>(10)
.reader(@Synchronized fun(): Int? {
})
.writer {
}
.taskExecutor(taskExecutor)
.build()
}
実行結果
Writerで出力しているログ内容から各タスクが並列実行(ログのスレッド名からスレッド数は8)されていることがわかります。
※デフォルトのスレッド数が8となっているので、この結果となります。
2019-07-14 07:40:51.342 INFO 24520 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
2019-07-14 07:40:51.403 INFO 24520 --- [ task-2] s.springbatchsample.BatchConfiguration : write size: 10, items: [1, 7, 11, 14, 17, 20, 24, 28, 32, 36]
2019-07-14 07:40:51.404 INFO 24520 --- [ task-3] s.springbatchsample.BatchConfiguration : write size: 10, items: [3, 5, 21, 25, 29, 33, 37, 38, 39, 40]
2019-07-14 07:40:51.405 INFO 24520 --- [ task-1] s.springbatchsample.BatchConfiguration : write size: 10, items: [2, 8, 10, 13, 16, 18, 22, 26, 31, 35]
2019-07-14 07:40:51.406 INFO 24520 --- [ task-4] s.springbatchsample.BatchConfiguration : write size: 10, items: [4, 6, 9, 12, 15, 19, 23, 27, 30, 34]
2019-07-14 07:40:51.436 INFO 24520 --- [ task-5] s.springbatchsample.BatchConfiguration : write size: 10, items: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
2019-07-14 07:40:51.445 INFO 24520 --- [ task-6] s.springbatchsample.BatchConfiguration : write size: 10, items: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
2019-07-14 07:40:51.452 INFO 24520 --- [ task-7] s.springbatchsample.BatchConfiguration : write size: 10, items: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
2019-07-14 07:40:51.460 INFO 24520 --- [ task-8] s.springbatchsample.BatchConfiguration : write size: 10, items: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
2019-07-14 07:40:51.467 INFO 24520 --- [ task-2] s.springbatchsample.BatchConfiguration : write size: 10, items: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
2019-07-14 07:40:51.473 INFO 24520 --- [ task-4] s.springbatchsample.BatchConfiguration : write size: 10, items: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
2019-07-14 07:40:51.618 INFO 24520 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{run.id=63}] and the following status: [COMPLETED]
スレッド数を変更してみる
スレッド数は、application.properties
で設定できます。
例えば、スレッド数を4としたい場合には、下のように設定します。
spring.task.execution.pool.core-size=4
実行結果
スレッド名が、task-1
からtask-4
までとなっていてスレッド数が4に制限されたことがわかります。
2019-07-14 07:48:49.039 INFO 24812 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
2019-07-14 07:48:49.062 INFO 24812 --- [ task-1] s.springbatchsample.BatchConfiguration : write size: 10, items: [3, 8, 11, 15, 19, 23, 27, 31, 35, 39]
2019-07-14 07:48:49.063 INFO 24812 --- [ task-2] s.springbatchsample.BatchConfiguration : write size: 10, items: [1, 6, 10, 13, 17, 21, 25, 29, 34, 37]
2019-07-14 07:48:49.063 INFO 24812 --- [ task-4] s.springbatchsample.BatchConfiguration : write size: 10, items: [2, 5, 9, 14, 18, 22, 26, 30, 33, 38]
2019-07-14 07:48:49.063 INFO 24812 --- [ task-3] s.springbatchsample.BatchConfiguration : write size: 10, items: [4, 7, 12, 16, 20, 24, 28, 32, 36, 40]
2019-07-14 07:48:49.076 INFO 24812 --- [ task-4] s.springbatchsample.BatchConfiguration : write size: 10, items: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
2019-07-14 07:48:49.083 INFO 24812 --- [ task-3] s.springbatchsample.BatchConfiguration : write size: 10, items: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
2019-07-14 07:48:49.090 INFO 24812 --- [ task-2] s.springbatchsample.BatchConfiguration : write size: 10, items: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
2019-07-14 07:48:49.096 INFO 24812 --- [ task-1] s.springbatchsample.BatchConfiguration : write size: 10, items: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
2019-07-14 07:48:49.101 INFO 24812 --- [ task-4] s.springbatchsample.BatchConfiguration : write size: 10, items: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
2019-07-14 07:48:49.107 INFO 24812 --- [ task-3] s.springbatchsample.BatchConfiguration : write size: 10, items: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
2019-07-14 07:48:49.178 INFO 24812 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{run.id=64}] and the following status: [COMPLETED]
ステップで使用できるスレッド数を制限する
ステップ構築時に、throttleLimit
を設定することで制限することができます。
デフォルトでは、スレッド数が8でthrottleLimit
が4なので、スッテップでは同時に4スレッドのみが使われます。
多分ですが、Flowを使用して同時に複数のステップを実行するような構成とした場合に使うのかなと思います。
今回は、throttleLimit
を2にしてWriterにスリープ処理を入れて制限がかかっていることを確認しやすくしています。
@Bean
fun step(): Step {
val input = (1..100).iterator()
return stepBuilderFactory.get("step")
.chunk<Int, Int>(10)
.reader(@Synchronized fun(): Int? {
})
.writer {
logger.info("write size: ${it.size}, items: ${it}")
TimeUnit.SECONDS.sleep(5)
jdbcTemplate.batchUpdate("insert into test (id) values (?)", object : BatchPreparedStatementSetter {
override fun setValues(ps: PreparedStatement, i: Int) {
ps.setInt(1, it[i])
}
override fun getBatchSize(): Int {
return it.size
}
})
}
.taskExecutor(taskExecutor)
.throttleLimit(2)
.build()
}
実行結果
ログから、2スレッド処理する毎に5秒間が開くのでthrottleLimit
でスレッド数が制限されていることがわかります。
2019-07-14 08:07:10.815 INFO 26547 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
2019-07-14 08:07:10.834 INFO 26547 --- [ task-1] s.springbatchsample.BatchConfiguration : write size: 10, items: [1, 4, 6, 7, 9, 11, 13, 15, 17, 19]
2019-07-14 08:07:10.834 INFO 26547 --- [ task-2] s.springbatchsample.BatchConfiguration : write size: 10, items: [2, 3, 5, 8, 10, 12, 14, 16, 18, 20]
2019-07-14 08:07:15.889 INFO 26547 --- [ task-3] s.springbatchsample.BatchConfiguration : write size: 10, items: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
2019-07-14 08:07:15.898 INFO 26547 --- [ task-4] s.springbatchsample.BatchConfiguration : write size: 10, items: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
2019-07-14 08:07:20.932 INFO 26547 --- [ task-5] s.springbatchsample.BatchConfiguration : write size: 10, items: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
2019-07-14 08:07:20.947 INFO 26547 --- [ task-6] s.springbatchsample.BatchConfiguration : write size: 10, items: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
2019-07-14 08:07:25.974 INFO 26547 --- [ task-7] s.springbatchsample.BatchConfiguration : write size: 10, items: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
2019-07-14 08:07:25.990 INFO 26547 --- [ task-8] s.springbatchsample.BatchConfiguration : write size: 10, items: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
2019-07-14 08:07:31.018 INFO 26547 --- [ task-2] s.springbatchsample.BatchConfiguration : write size: 10, items: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
2019-07-14 08:07:31.037 INFO 26547 --- [ task-1] s.springbatchsample.BatchConfiguration : write size: 10, items: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
2019-07-14 08:07:36.095 INFO 26547 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{run.id=71}] and the following status: [COMPLETED]