Spring Batch – Scheduling Multiple Jobs Parallelly

Parallel Jobs

In this example I am going to show you how to run multiple batch jobs parallelly in Spring Batch framework. Spring Batch is a lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.

Spring Batch provides reusable functions that are essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management. It also provides more advanced technical services and features that will enable extremely high-volume and high performance batch jobs through optimization and partitioning techniques. Simple as well as complex, high-volume batch jobs can leverage the framework in a highly scalable manner to process significant volumes of information.

Prerequisites

Java 11/19, Spring Batch 2.4.2/3.1.2, Maven 3.6.3/3.8.5

Project Setup

You can create either gradle or maven based project in your favorite IDE or tool. The name of the project is spring-batch-multiple-parallel-jobs.

If you are creating maven based project then you can use the following pom.xml file.

For Spring Boot version 3.x you can use the following pom.xml file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-batch-multiple-parallel-jobs</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>19</maven.compiler.source>
		<maven.compiler.target>19</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.1.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

For spring boot version 2.x you can use the following pom.xml file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-batch-multiple-parallel-jobs</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>12</maven.compiler.source>
		<maven.compiler.target>12</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Model Class

The model class will be used in reader, writer, processor. The model class represents an object for the data.

public class Item {

	private String id;
	private LocalDate date;

	public Item() {
	}

	public Item(String id, LocalDate date) {
		this.id = id;
		this.date = date;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public LocalDate getDate() {
		return date;
	}

	public void setDate(LocalDate date) {
		this.date = date;
	}

	@Override
	public String toString() {
		return "ItemModel [id=" + id + ", date=" + date + "]";
	}

}

Item Reader

Item reader is an abstraction that represents the retrieval of input for a Step, one item at a time.

For spring boot version 3.x use the following class:

public class JobReader implements ItemReader<Item> {

	private Item item;

	public JobReader(Item item) {
		this.item = item;
	}

	@Override
	public Item read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		System.out.println("JobReader::read() -> item: " + item);

		return item;
	}

}

For spring boot version 2.x use the following class:

public class JobReader implements ItemReader<Item> {

	private Item item;

	public JobReader(Item item) {
		this.item = item;
	}

	@Override
	public Item read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		System.out.println("JobReader::read() -> item: " + item);

		return item;
	}

}

Item Writer

Item writer is an abstraction that represents the output of a Step, one batch or chunk of items at a time. Generally, an item writer has no knowledge of the input it will receive next, only the item that was passed in its current invocation. For spring boot 3.x the List<?> has been replaced by Chunk<?>.

For spring boot version 3.x use the following class:

public class JobWriter implements ItemWriter<Item> {

	private Item item = new Item();

	@Override
	public void write(Chunk<? extends Item> chunk) throws Exception {
		System.out.println("JobWriter::write() -> item: " + item);

		item.setId(chunk.getItems().get(0).getId());
		item.setDate(chunk.getItems().get(0).getDate());
	}

	public Item getOutput() {
		return item;
	}
	
}

For spring boot version 2.x use the following class:

public class JobWriter implements ItemWriter<Item> {

	private Item item = new Item();

	@Override
	public void write(List<? extends Item> items) throws Exception {
		System.out.println("JobWriter::write() -> item: " + item);

		item.setId(items.get(0).getId());
		item.setDate(items.get(0).getDate());
	}

	public Item getOutput() {
		return item;
	}

}

Item Processor

Item Processor is an abstraction that represents the business processing of an item. While the ItemReader reads one item, and the ItemWriter writes them, the ItemProcessor provides access to transform or apply other business processing. If, while processing the item, it is determined that the item is not valid, returning null indicates that the item should not be written out.

For spring boot version 3.x use the following class:

public class JobProcessor implements ItemProcessor<Item, Item> {

	@Override
	public Item process(Item item) throws Exception {
		System.out.println("JobProcessor::process() -> item: " + item);

		Item model = new Item();
		model.setId(item.getId());
		model.setDate(item.getDate());

		return model;
	}

}

For spring boot version 2.x use the following class:

public class JobProcessor implements ItemProcessor<Item, Item> {

	@Override
	public Item process(Item item) throws Exception {
		System.out.println("JobProcessor::process() -> item: " + item);

		Item model = new Item();
		model.setId(item.getId());
		model.setDate(item.getDate());

		return model;
	}

}

Listeners

I am creating listeners for execution of ItemReader, ItemWriter, Step, etc. Listeners help to intercept the executions of Job or Step and allow them to perform some functionalities.

Read Listener

You can perform some activities while you are reading input. You can do some activities before and after read of the input.

For spring boot version 3.x use the following class:

public class ReadListener<T> implements ItemReadListener<T> {

	@Override
	public void beforeRead() {
		System.out.println("ReaderListener::beforeRead()");
	}

	@Override
	public void afterRead(T item) {
		System.out.println("ReaderListener::afterRead() -> " + item);
	}

	@Override
	public void onReadError(Exception ex) {
		System.out.println("ReaderListener::onReadError() -> " + ex);
	}

}

For spring boot 2.x use the following class:

@Component
public class ReadListener<T> implements ItemReadListener<T> {

	@Override
	public void beforeRead() {
		System.out.println("ReaderListener::beforeRead()");
	}

	@Override
	public void afterRead(T item) {
		System.out.println("ReaderListener::afterRead() -> " + item);
	}

	@Override
	public void onReadError(Exception ex) {
		System.out.println("ReaderListener::onReadError() -> " + ex);
	}

}

Write Listener

Like read listener, you can also do some activities before and after write of your data.

For spring boot 3.x use the following class:

public class WriteListener<S> implements ItemWriteListener<S> {

	@Override
	public void beforeWrite(Chunk<? extends S> chunk) {
		System.out.println("ReaderListener::beforeWrite() -> " + chunk);
	}

	@Override
	public void afterWrite(Chunk<? extends S> chunk) {
		System.out.println("ReaderListener::afterWrite() -> " + chunk);
	}

	@Override
	public void onWriteError(Exception exception, Chunk<? extends S> chunk) {
		System.out.println("ReaderListener::onWriteError() -> " + exception + ", " + chunk);
	}

}

For spring boot 2.x use the following class:

@Component
public class WriteListener<S> implements ItemWriteListener<S> {

	@Override
	public void beforeWrite(List<? extends S> items) {
		System.out.println("ReaderListener::beforeWrite() -> " + items);
	}

	@Override
	public void afterWrite(List<? extends S> items) {
		System.out.println("ReaderListener::afterWrite() -> " + items);
	}

	@Override
	public void onWriteError(Exception exception, List<? extends S> items) {
		System.out.println("ReaderListener::onWriteError() -> " + exception + ", " + items);
	}

}

Step Listener

You can perform some activities before and after each step execution using step listener.

For spring boot version 3.x use the following class:

public class StepListener implements StepExecutionListener {

	@Override
	public void beforeStep(StepExecution stepExecution) {
		System.out.println("StepListener::beforeStep() -> Step " + stepExecution.getStepName() + " completed for "
				+ stepExecution.getJobExecution().getJobInstance().getJobName());
	}

	@Override
	public ExitStatus afterStep(StepExecution stepExecution) {
		System.out.println("StepListener::afterStep() -> Step " + stepExecution.getStepName() + " started for "
				+ stepExecution.getJobExecution().getJobInstance().getJobName());

		return null;
	}

}

For spring boot version 2.x use the following class:

@Component
public class StepListener implements StepExecutionListener {

	@Override
	public void beforeStep(StepExecution stepExecution) {
		System.out.println("StepListener::beforeStep() -> Step " + stepExecution.getStepName() + " completed for "
				+ stepExecution.getJobExecution().getJobInstance().getJobName());
	}

	@Override
	public ExitStatus afterStep(StepExecution stepExecution) {
		System.out.println("StepListener::afterStep() -> Step " + stepExecution.getStepName() + " started for "
				+ stepExecution.getJobExecution().getJobInstance().getJobName());

		return null;
	}

}

Job Listener

This job listener can be used to track the status of each job and you can also use it to stop a job.

For spring boot 3.x use the following class:

public class JobListener implements JobExecutionListener {

	private JobExecution active;

	@Autowired
	private JobOperator jobOperator;

	@Override
	public void beforeJob(JobExecution jobExecution) {
		final String jobName = jobExecution.getJobInstance().getJobName();
		final BatchStatus jobStatus = jobExecution.getStatus();

		System.out.println("JobListener::beforeJob() -> jobExecution: " + jobName + ", " + jobStatus.name());

		synchronized (jobExecution) {
			if (active != null && active.isRunning()) {
				System.out.println("JobListener::beforeJob(): isRunning() -> jobExecution: " + jobName + ", "
						+ jobStatus.isRunning());
				try {
					jobOperator.stop(jobExecution.getId());
				} catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
					e.printStackTrace();
				}
			} else {
				active = jobExecution;
			}
		}
	}

	@Override
	public void afterJob(JobExecution jobExecution) {
		final String jobName = jobExecution.getJobInstance().getJobName();
		final BatchStatus jobStatus = jobExecution.getStatus();

		System.out.println("JobListener::afterJob() -> jobExecution: " + jobName + ", " + jobStatus.name());

		synchronized (jobExecution) {
			if (jobExecution == active) {
				active = null;
			}
		}
	}

}

For spring boot 2.x use the following class:

public class JobListener implements JobExecutionListener {

	private JobExecution active;

	@Autowired
	private JobOperator jobOperator;

	@Override
	public void beforeJob(JobExecution jobExecution) {
		final String jobName = jobExecution.getJobInstance().getJobName();
		final BatchStatus jobStatus = jobExecution.getStatus();

		System.out.println("JobListener::beforeJob() -> jobExecution: " + jobName + ", " + jobStatus.getBatchStatus());

		synchronized (jobExecution) {
			if (active != null && active.isRunning()) {
				System.out.println("JobListener::beforeJob(): isRunning() -> jobExecution: " + jobName + ", "
						+ jobStatus.isRunning());
				try {
					jobOperator.stop(jobExecution.getId());
				} catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
					e.printStackTrace();
				}
			} else {
				active = jobExecution;
			}
		}
	}

	@Override
	public void afterJob(JobExecution jobExecution) {
		final String jobName = jobExecution.getJobInstance().getJobName();
		final BatchStatus jobStatus = jobExecution.getStatus();

		System.out.println("JobListener::afterJob() -> jobExecution: " + jobName + ", " + jobStatus.getBatchStatus());

		synchronized (jobExecution) {
			if (jobExecution == active) {
				active = null;
			}
		}
	}

}

Job Configuration

Finally I am configuring multiple jobs to run them parallelly. I am configuring three jobs and scheduled at fixed rate.

A default simple implementation of the Job interface is provided by Spring Batch in the form of the SimpleJob class which creates some standard functionality on top of Job, however the batch namespace abstracts away the need to instantiate it directly.

A Step is a domain object that encapsulates an independent, sequential phase of a batch job. Therefore, every Job is composed entirely of one or more steps. A Step contains all of the information necessary to define and control the actual batch processing.

TransactionManager – Spring’s that will be used to begin and commit transactions during processing.

Chunk – The number of items that will be processed before the transaction is committed.

JobRepository is the persistence mechanism. It provides CRUD operations for JobLauncher, Job and Step implementations. When a Job is first launched, a JobExecution is obtained from the repository, and during the course of execution StepExecution and JobExecution implementations are persisted by passing them to the repository.

JonLauncher represents a simple interface for launching a Job with a given set of JobParameters.

For spring boot version 3.x you can use the following class. In spring boot 3.x the JobBuilderFactory has been replaced by JobBuilder and StepBuilderFactory has been replaced by StepBuilder. You also don’t need to use @EnableBatchProcessing annotation on the class.

@Configuration
@EnableScheduling
public class JobConfiguration {

	@Autowired
	private DataSource dataSource;

	@Autowired
	private PlatformTransactionManager platformTransactionManager;

	@Bean
	public ItemReader<Item> jobReader() {
		return new JobReader(new Item("1000", LocalDate.now()));
	}

	@Bean
	public ItemProcessor<Item, Item> jobProcessor() {
		return new JobProcessor();
	}

	@Bean
	public ItemWriter<Item> jobWriter() {
		return new JobWriter();
	}

	@Bean
	public ItemReadListener<Item> readListener() {
		return new ReadListener<Item>();
	}

	@Bean
	public StepExecutionListener stepListener() {
		return new StepListener();
	}

	@Bean
	public ItemWriteListener<Item> writeListener() {
		return new WriteListener<Item>();
	}

	@Bean
	public JobExecutionListener jobListener() {
		return new JobListener();
	}

	@Bean
	public JobExplorer jobExplorer() throws Exception {
		JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
		factoryBean.setDataSource(dataSource);
		return factoryBean.getObject();
	}

	@Bean
	public Step step1() throws Exception {
		return new StepBuilder("Step1", jobRepository()).listener(stepListener()).listener(readListener())
				.listener(writeListener()).tasklet((contribution, chunkContext) -> {
					System.out.println("Tasklet has run");
					return RepeatStatus.FINISHED;
				}, platformTransactionManager).build();
	}

	@Bean
	public Step step2() throws Exception {
		return new StepBuilder("Step2", jobRepository()).listener(stepListener()).listener(readListener())
				.listener(writeListener()).<String, String>chunk(3, platformTransactionManager)
				.reader(new ListItemReader<>(Arrays.asList("1", "2", "3", "4", "5", "6")))
				.processor(new ItemProcessor<String, String>() {
					@Override
					public String process(String item) throws Exception {
						return String.valueOf(Integer.parseInt(item) * -1);
					}
				}).writer(items -> {
					for (String item : items) {
						System.out.println(">> " + item);
					}
				}).build();
	}

	@Bean
	public Job job1() throws Exception {
		return new JobBuilder("Job1", jobRepository()).listener(jobListener()).incrementer(new RunIdIncrementer())
				.start(step1()).next(step2()).build();
	}

	@Bean
	public Step anotherStep() throws Exception {
		return new StepBuilder("AnotherStep", jobRepository()).listener(stepListener()).listener(readListener())
				.listener(writeListener()).tasklet((contribution, chunkContext) -> {
					System.out.println("Yet another Tasklet!");
					return RepeatStatus.FINISHED;
				}, platformTransactionManager).build();
	}

	@Bean
	public Job job2() throws Exception {
		return new JobBuilder("Job2", jobRepository()).listener(jobListener()).incrementer(new RunIdIncrementer())
				.start(anotherStep()).build();
	}

	@Bean
	public Job job3() throws Exception {
		Step step = new StepBuilder("Job3Step", jobRepository()).listener(readListener()).listener(stepListener())
				.listener(writeListener()).<Item, Item>chunk(1, platformTransactionManager).reader(jobReader())
				.processor(jobProcessor()).writer(jobWriter()).build();

		return new JobBuilder("Job3", jobRepository()).listener(jobListener()).start(step).build();
	}

	@Bean
	public TaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		taskExecutor.setCorePoolSize(15);
		taskExecutor.setMaxPoolSize(20);
		taskExecutor.setQueueCapacity(30);
		return taskExecutor;
	}

	@Bean
	public JobRepository jobRepository() throws Exception {
		JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
		factory.setDataSource(dataSource);
		factory.setTransactionManager(platformTransactionManager);
		factory.afterPropertiesSet();
		return factory.getObject();
	}

	@Bean
	public JobLauncher jobLauncher() throws Exception {
		TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
		jobLauncher.setTaskExecutor(taskExecutor()); // Or below line
		// jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
		jobLauncher.setJobRepository(jobRepository());
		jobLauncher.afterPropertiesSet();
		return jobLauncher;
	}

	@Bean
	public JobOperator jobOperator(JobRegistry jobRegistry) throws Exception {
		SimpleJobOperator jobOperator = new SimpleJobOperator();
		jobOperator.setJobExplorer(jobExplorer());
		jobOperator.setJobLauncher(jobLauncher());
		jobOperator.setJobRegistry(jobRegistry);
		jobOperator.setJobRepository(jobRepository());
		return jobOperator;
	}

	@Bean
	public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
		JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
		jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
		return jobRegistryBeanPostProcessor;
	}

	// @Scheduled(cron = "*/5 * * * * *")
	@Scheduled(fixedRate = 10000)
	public void run1() {
		JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
				.toJobParameters();
		try {
			jobLauncher().run(job1(), jobParameters);
		} catch (Exception ex) {
			System.err.println(ex.getMessage());
		}

	}

	// @Scheduled(cron = "*/5 * * * * *")
	@Scheduled(fixedRate = 10000)
	public void run2() {
		JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
				.toJobParameters();
		try {
			jobLauncher().run(job2(), jobParameters);
		} catch (Exception ex) {
			System.err.println(ex.getMessage());
		}
	}

	// @Scheduled(cron = "*/5 * * * * *")
	@Scheduled(fixedRate = 10000)
	public void run3() {
		JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
				.toJobParameters();
		try {
			jobLauncher().run(job3(), jobParameters);
		} catch (Exception ex) {
			System.err.println(ex.getMessage());
		}
	}

}

For spring boot 2.x use the following class:

@Configuration
@EnableScheduling
@EnableBatchProcessing
public class JobConfiguration {

	@Autowired
	private JobExplorer jobExplorer;

	@Autowired
	private JobRepository jobRepository;

	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Bean
	public ItemReader<Item> jobReader() {
		return new JobReader(new Item("1000", LocalDate.now()));
	}

	@Bean
	public ItemProcessor<Item, Item> jobProcessor() {
		return new JobProcessor();
	}

	@Bean
	public ItemWriter<Item> jobWriter() {
		return new JobWriter();
	}

	@Bean
	public ItemReadListener<Item> readListener() {
		return new ReadListener<Item>();
	}

	@Bean
	public StepExecutionListener stepListener() {
		return new StepListener();
	}

	@Bean
	public ItemWriteListener<Item> writeListener() {
		return new WriteListener<Item>();
	}

	@Bean
	public JobExecutionListener jobListener() {
		return new JobListener();
	}

	@Bean
	public Step step1() {
		return stepBuilderFactory.get("Step1").listener(stepListener()).listener(readListener())
				.listener(writeListener()).tasklet((contribution, chunkContext) -> {
					System.out.println("Tasklet has run");
					return RepeatStatus.FINISHED;
				}).build();
	}

	@Bean
	public Step step2() {
		return stepBuilderFactory.get("Step2").listener(stepListener()).listener(readListener())
				.listener(writeListener()).<String, String>chunk(3)
				.reader(new ListItemReader<>(Arrays.asList("1", "2", "3", "4", "5", "6")))
				.processor(new ItemProcessor<String, String>() {
					@Override
					public String process(String item) throws Exception {
						return String.valueOf(Integer.parseInt(item) * -1);
					}
				}).writer(items -> {
					for (String item : items) {
						System.out.println(">> " + item);
					}
				}).build();
	}

	@Bean
	public Job job1() {
		return this.jobBuilderFactory.get("Job1").listener(jobListener()).incrementer(new RunIdIncrementer())
				.start(step1()).next(step2()).build();
	}

	@Bean
	public Step anotherStep() {
		return this.stepBuilderFactory.get("AnotherStep").listener(stepListener()).listener(readListener())
				.listener(writeListener()).tasklet((contribution, chunkContext) -> {
					System.out.println("Yet another Tasklet!");
					return RepeatStatus.FINISHED;
				}).build();
	}

	@Bean
	public Job job2() {
		return this.jobBuilderFactory.get("Job2").listener(jobListener()).incrementer(new RunIdIncrementer())
				.start(anotherStep()).build();
	}

	@Bean
	public Job job3() {
		Step step = stepBuilderFactory.get("Job3Step").listener(readListener()).listener(stepListener())
				.listener(writeListener()).<Item, Item>chunk(1).reader(jobReader()).processor(jobProcessor())
				.writer(jobWriter()).build();

		return jobBuilderFactory.get("Job3").listener(jobListener()).start(step).build();
	}

	@Bean
	public ResourcelessTransactionManager transactionManager() {
		return new ResourcelessTransactionManager();
	}

	@Bean
	public TaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		taskExecutor.setCorePoolSize(15);
		taskExecutor.setMaxPoolSize(20);
		taskExecutor.setQueueCapacity(30);
		return taskExecutor;
	}

	@Bean
	public JobLauncher jobLauncher() throws Exception {
		SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
		jobLauncher.setTaskExecutor(taskExecutor()); // Or below line
		// jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
		jobLauncher.setJobRepository(jobRepository);
		jobLauncher.afterPropertiesSet();
		return jobLauncher;
	}

	@Bean
	public JobOperator jobOperator(JobRegistry jobRegistry) throws Exception {
		SimpleJobOperator jobOperator = new SimpleJobOperator();
		jobOperator.setJobExplorer(jobExplorer);
		jobOperator.setJobLauncher(jobLauncher());
		jobOperator.setJobRegistry(jobRegistry);
		jobOperator.setJobRepository(jobRepository);
		return jobOperator;
	}

	@Bean
	public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
		JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
		jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
		return jobRegistryBeanPostProcessor;
	}

	// @Scheduled(cron = "*/5 * * * * *")
	@Scheduled(fixedRate = 10000)
	public void run1() {
		Map<String, JobParameter> confMap = new HashMap<>();
		confMap.put("time", new JobParameter(System.currentTimeMillis()));
		JobParameters jobParameters = new JobParameters(confMap);
		try {
			jobLauncher().run(job1(), jobParameters);
		} catch (Exception ex) {
			System.err.println(ex.getMessage());
		}

	}

	// @Scheduled(cron = "*/5 * * * * *")
	@Scheduled(fixedRate = 10000)
	public void run2() {
		Map<String, JobParameter> confMap = new HashMap<>();
		confMap.put("time", new JobParameter(System.currentTimeMillis()));
		JobParameters jobParameters = new JobParameters(confMap);
		try {
			jobLauncher().run(job2(), jobParameters);
		} catch (Exception ex) {
			System.err.println(ex.getMessage());
		}
	}

	// @Scheduled(cron = "*/5 * * * * *")
	@Scheduled(fixedRate = 10000)
	public void run3() {
		Map<String, JobParameter> confMap = new HashMap<>();
		confMap.put("time", new JobParameter(System.currentTimeMillis()));
		JobParameters jobParameters = new JobParameters(confMap);
		try {
			jobLauncher().run(job3(), jobParameters);
		} catch (Exception ex) {
			System.err.println(ex.getMessage());
		}
	}
}

Application Config

I am going to create application.properties file under src/main/resources folder in order to override bean definition and give a name to the job. This is required for spring boot version 3.x.

spring.main.allow-bean-definition-overriding=true

spring.batch.job.name=singleStepJob
#spring.batch.job.name=multipleStepJob

If only one job is present in Spring Context then Spring runs the job on startup, but if multiple jobs are present in Spring Context then you need to tell Spring which job to run on startup. Hence you need the key/value pair spring.batch.job.name=singleStepJob or spring.batch.job.name=multipleStepJob.

If you do not provide the job name key/value pair you may get the following exception while running the application:

Caused by: java.lang.IllegalArgumentException: Job name must be specified in case of multiple jobs

DataSource Configuration

In spring boot 3.x or spring batch 5.x, you need to store the meta data for batch process into the table. So I am using H2 database for this purpose.

@Configuration
public class DataSourceConfig {

	@Bean
	public DataSource dataSource() {
		EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
		return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
				.addScript("classpath:org/springframework/batch/core/schema-h2.sql").setType(EmbeddedDatabaseType.H2)
				.build();
	}

	@Bean
	public PlatformTransactionManager platformTransactionManager() {
		return new DataSourceTransactionManager(dataSource());
	}

}

Main Class

A class having main method and @SpringBootApplication annotation is enough to deploy the application into embedded Tomcat server.

@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class SpringBatchMultipleJobsApp {

	public static void main(String[] args) {
		SpringApplication.run(SpringBatchMultipleJobsApp.class, args);
	}

}

In the above class I have excluded DataSourceAutoConfiguration class as I am not using any kind of database and memory based map will be used for processing jobs.

Testing the Application

Executing the above main class you will find that all jobs started one after another and are running parallelly.

09:15:10.002  INFO 13544 --- [  restartedMain] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
09:15:10.151  INFO 13544 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
09:15:10.215  INFO 13544 --- [  restartedMain] c.r.s.b.m.p.j.SpringBatchMultipleJobsApp : Started SpringBatchMultipleJobsApp in 3.875 seconds (JVM running for 5.179)
09:15:10.424  INFO 13544 --- [ taskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job2]] launched with the following parameters: [{time=1610336710218}]
09:15:10.461  INFO 13544 --- [ taskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job3]] launched with the following parameters: [{time=1610336710423}]
09:15:10.462  INFO 13544 --- [ taskExecutor-3] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job1]] launched with the following parameters: [{time=1610336710443}]
JobListener::beforeJob() -> jobExecution: Job3, STARTED
JobListener::beforeJob() -> jobExecution: Job2, STARTED
JobListener::beforeJob(): isRunning() -> jobExecution: Job2, true
JobListener::beforeJob() -> jobExecution: Job1, STARTED
JobListener::beforeJob(): isRunning() -> jobExecution: Job1, true
09:15:10.542  INFO 13544 --- [ taskExecutor-2] o.s.batch.core.job.SimpleStepHandler     : Executing step: [Job3Step]
StepListener::beforeStep() -> Step Job3Step completed for Job3
09:15:10.628  INFO 13544 --- [ taskExecutor-3] o.s.batch.core.job.SimpleStepHandler     : Executing step: [Step1]
09:15:10.649  INFO 13544 --- [ taskExecutor-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [AnotherStep]
09:15:10.653  INFO 13544 --- [ taskExecutor-3] o.s.b.c.r.support.SimpleJobRepository    : Parent JobExecution is stopped, so passing message on to StepExecution
StepListener::beforeStep() -> Step Step1 completed for Job1
09:15:10.658  INFO 13544 --- [ taskExecutor-1] o.s.b.c.r.support.SimpleJobRepository    : Parent JobExecution is stopped, so passing message on to StepExecution
StepListener::beforeStep() -> Step AnotherStep completed for Job2
09:15:10.663  INFO 13544 --- [ taskExecutor-3] o.s.b.c.s.ThreadStepInterruptionPolicy   : Step interrupted through StepExecution
09:15:10.664  INFO 13544 --- [ taskExecutor-3] o.s.batch.core.step.AbstractStep         : Encountered interruption executing step Step1 in job Job1 : Job interrupted status detected.
StepListener::afterStep() -> Step Step1 started for Job1
JobReader::read() -> item: ItemModel [id=1000, date=2021-01-11]
09:15:10.672  INFO 13544 --- [ taskExecutor-1] o.s.b.c.s.ThreadStepInterruptionPolicy   : Step interrupted through StepExecution
09:15:10.675  INFO 13544 --- [ taskExecutor-1] o.s.batch.core.step.AbstractStep         : Encountered interruption executing step AnotherStep in job Job2 : Job interrupted status detected.
StepListener::afterStep() -> Step AnotherStep started for Job2
JobProcessor::process() -> item: ItemModel [id=1000, date=2021-01-11]
JobWriter::write() -> item: ItemModel [id=null, date=null]
09:15:10.694  INFO 13544 --- [ taskExecutor-3] o.s.batch.core.step.AbstractStep         : Step: [Step1] executed in 64ms
09:15:10.696  INFO 13544 --- [ taskExecutor-3] o.s.b.c.r.support.SimpleJobRepository    : Parent JobExecution is stopped, so passing message on to StepExecution
JobReader::read() -> item: ItemModel [id=1000, date=2021-01-11]
JobProcessor::process() -> item: ItemModel [id=1000, date=2021-01-11]
JobWriter::write() -> item: ItemModel [id=1000, date=2021-01-11]
09:15:10.697  INFO 13544 --- [ taskExecutor-3] o.s.batch.core.job.AbstractJob           : Encountered interruption executing job: Job interrupted by step execution
09:15:10.702  INFO 13544 --- [ taskExecutor-1] o.s.batch.core.step.AbstractStep         : Step: [AnotherStep] executed in 52ms
JobListener::afterJob() -> jobExecution: Job1, STOPPED
09:15:10.706  INFO 13544 --- [ taskExecutor-1] o.s.b.c.r.support.SimpleJobRepository    : Parent JobExecution is stopped, so passing message on to StepExecution
09:15:10.709  INFO 13544 --- [ taskExecutor-1] o.s.batch.core.job.AbstractJob           : Encountered interruption executing job: Job interrupted by step execution
JobListener::afterJob() -> jobExecution: Job2, STOPPED
09:15:10.716  INFO 13544 --- [ taskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job2]] completed with the following parameters: [{time=1610336710218}] and the following status: [STOPPED] in 216ms
09:15:10.717  INFO 13544 --- [ taskExecutor-3] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job1]] completed with the following parameters: [{time=1610336710443}] and the following status: [STOPPED] in 209ms
JobReader::read() -> item: ItemModel [id=1000, date=2021-01-11]
JobProcessor::process() -> item: ItemModel [id=1000, date=2021-01-11]
JobWriter::write() -> item: ItemModel [id=1000, date=2021-01-11]
...

If jobs were not running parallelly then you would have seen the output sequentially:

08:17:39.163  INFO 14220 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job1]] launched with the following parameters: [{run.id=1}]
JobListener::beforeJob() -> jobExecution: Job1, STARTED
08:17:39.255  INFO 14220 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [Step1]
StepListener::beforeStep() -> Step Step1 completed for Job1
Tasklet has run
StepListener::afterStep() -> Step Step1 started for Job1
08:17:39.299  INFO 14220 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [Step1] executed in 44ms
08:17:39.316  INFO 14220 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [Step2]
StepListener::beforeStep() -> Step Step2 completed for Job1
>> -1
>> -2
>> -3
>> -4
>> -5
>> -6
StepListener::afterStep() -> Step Step2 started for Job1
08:17:39.374  INFO 14220 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [Step2] executed in 58ms
JobListener::afterJob() -> jobExecution: Job1, COMPLETED
08:17:39.388  INFO 14220 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job1]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 165ms
08:17:39.400  INFO 14220 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job2]] launched with the following parameters: [{run.id=1}]
JobListener::beforeJob() -> jobExecution: Job2, STARTED
08:17:39.413  INFO 14220 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [AnotherStep]
StepListener::beforeStep() -> Step AnotherStep completed for Job2
Yet another Tasklet!
StepListener::afterStep() -> Step AnotherStep started for Job2
08:17:39.424  INFO 14220 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [AnotherStep] executed in 11ms
JobListener::afterJob() -> jobExecution: Job2, COMPLETED
08:17:39.444  INFO 14220 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job2]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 27ms
08:17:39.448  INFO 14220 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Job3]] launched with the following parameters: [{}]
JobListener::beforeJob() -> jobExecution: Job3, STARTED
08:17:39.461  INFO 14220 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [Job3Step]
StepListener::beforeStep() -> Step Job3Step completed for Job3
JobReader::read() -> item: ItemModel [id=1000, date=2021-01-11]
JobProcessor::process() -> item: ItemModel [id=1000, date=2021-01-11]
JobWriter::write() -> item: ItemModel [id=null, date=null]
...

That’s all about how to run multiple jobs parallelly using Spring Batch framework.

Source Code

Download

1 thought on “Spring Batch – Scheduling Multiple Jobs Parallelly

  1. Is this able to happen only because they are not calling the same reader/processor/writer?

    Would the data be corrupted if 2 threads call the same reader at the same time?

Leave a Reply

Your email address will not be published. Required fields are marked *