Spring Boot Spring Batch Quartz Scheduler Integration example
In fact, we will create an application which reads
data from a database table (CUSTOMER_INPUT) and writes this data in another
table CUSTOMER_OUTPUT.
The two tables have the same columns.
- ID : primary key
- Firstname : String
- Lastname : String
Spring batch is used to read data from the table
CUSTOMER_INPUT and store it in the table CUSTOMER_OUTPUT.
So, in spring batch we have a custom reader that reads data
from CUSTOMER_INPUT using pagination and a custom writer that writes data into
CUSTOMER_OUTPUT.
Here, we have no processor.
The code of the domain objects is the following :
CustomerInput.java
package io.spring.batch.domain; import javax.persistence.*; public class CustomerInput { @Id private long id; private String firstName; private String lastName; public CustomerInput() { } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public CustomerInput(long id, String firstName, String lastName) { this.id = id; this.firstName = firstName; this.lastName = lastName; } @Override public String toString() { return "CustomerInput{" + "id=" + id + ", firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + '}'; } }
CustomerOutput.java
package io.spring.batch.domain; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; public class CustomerOutput { @Id private long id; private String firstName; private String lastName; public CustomerOutput() { } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public CustomerOutput(long id, String firstName, String lastName) { this.id = id; this.firstName = firstName; this.lastName = lastName; } @Override public String toString() { return "CustomerOutput{" + "id=" + id + ", firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + '}'; } }
The row mapper that maps every row in the CUSTOMER_INPUT
table to a CustomerInput object.
CustomerRowMapper.java
package io.spring.batch.domain; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; public class CustomerRowMapper implements RowMapper{ @Override public CustomerInput mapRow(ResultSet resultSet, int i) throws SQLException { return new CustomerInput(resultSet.getLong("ID"), resultSet.getString("FIRSTNAME"), resultSet.getString("LASTNAME")); } }
The configuration of Spring batch is in the JobConfiguration class :
JobConfiguration.java
package io.spring.batch.configuration; import java.util.HashMap; import java.util.Map; import javax.sql.DataSource; import io.spring.batch.domain.CustomerInput; import io.spring.batch.domain.CustomerOutput; import io.spring.batch.domain.CustomerRowMapper; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.batch.item.database.Order; import org.springframework.batch.item.database.support.MySqlPagingQueryProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author Michael Minella */ @Configuration public class JobConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired public DataSource dataSource; @Bean public JdbcPagingItemReaderpagingItemReader() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); reader.setFetchSize(10); reader.setRowMapper(new CustomerRowMapper()); MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("ID, FIRSTNAME, LASTNAME"); queryProvider.setFromClause("from CUSTOMER_INPUT"); Map sortKeys = new HashMap<>(1); sortKeys.put("ID", Order.ASCENDING); queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider); return reader; } @Bean public JdbcBatchItemWriter customerItemWriter() { JdbcBatchItemWriter itemWriter = new JdbcBatchItemWriter<>(); itemWriter.setDataSource(this.dataSource); itemWriter.setSql("INSERT INTO CUSTOMER_OUTPUT(FIRSTNAME,LASTNAME) VALUES (:firstName, :lastName)"); itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider()); itemWriter.afterPropertiesSet(); return itemWriter; } @Bean public Step step1() { return stepBuilderFactory.get("step1") . chunk(10) .reader(pagingItemReader()) .writer(customerItemWriter()) .build(); } @Bean public Job job() { return jobBuilderFactory.get("job") .start(step1()) .build(); } }
Here, the job Spring batch is composed by a single step that reads the data from CUSTOMER_INPUT and writes it in CUSTOMER_OUTPUT.
The custom Quartz job is the following :
QuartzJobLauncher.java
package io.spring.batch.quartz; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.configuration.JobLocator; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.quartz.QuartzJobBean; public class QuartzJobLauncher extends QuartzJobBean { private String jobName; private JobLauncher jobLauncher; private JobLocator jobLocator; public String getJobName() { return jobName; } public void setJobName(String jobName) { this.jobName = jobName; } public JobLauncher getJobLauncher() { return jobLauncher; } public void setJobLauncher(JobLauncher jobLauncher) { this.jobLauncher = jobLauncher; } public JobLocator getJobLocator() { return jobLocator; } public void setJobLocator(JobLocator jobLocator) { this.jobLocator = jobLocator; } @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters(); try { ApplicationContext applicationContext = (ApplicationContext) context.getScheduler().getContext().get("applicationContext"); jobLocator = (JobLocator) applicationContext.getBean(JobLocator.class); jobLauncher = (JobLauncher) applicationContext.getBean(JobLauncher.class); Job job = jobLocator.getJob(jobName); JobParameters params = new JobParametersBuilder() .addString("JobID", String.valueOf(System.currentTimeMillis())) .toJobParameters(); jobLauncher.run(job, params); } catch (Exception e) { e.printStackTrace(); } } }
The Quartz configuration is presented in the following file :
QuartzConfig.java
package io.spring.batch.configuration; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import io.spring.batch.quartz.QuartzJobLauncher; import org.quartz.*; import org.springframework.batch.core.configuration.JobLocator; import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.PropertiesFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.quartz.CronTriggerFactoryBean; import org.springframework.scheduling.quartz.JobDetailFactoryBean; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import static org.quartz.CronScheduleBuilder.cronSchedule; @Configuration public class QuartzConfig { @Autowired private JobLauncher jobLauncher; @Autowired private JobLocator jobLocator; @Value("${cronExpression}") private String cronExpression; @Bean public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) { JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor(); jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry); return jobRegistryBeanPostProcessor; } @Bean public JobDetail jobDetail() { //Set Job data map JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put("jobName", "job"); return JobBuilder.newJob(QuartzJobLauncher.class) .withIdentity("job",null) .setJobData(jobDataMap) .storeDurably() .build(); } @Bean public Trigger jobTrigger() { return TriggerBuilder .newTrigger() .forJob(jobDetail()) .withIdentity("jobTrigger",null) .withSchedule(cronSchedule(cronExpression)) .build(); } @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException, SchedulerException { SchedulerFactoryBean scheduler = new SchedulerFactoryBean(); scheduler.setTriggers(jobTrigger()); scheduler.setQuartzProperties(quartzProperties()); scheduler.setJobDetails(jobDetail()); scheduler.setApplicationContextSchedulerContextKey("applicationContext"); return scheduler; } public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/application.properties")); propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } }
The main class is the SpringQuartzIntegration class. It is the entry point to the app.
SpringQuartzIntegration.java
package io.spring.batch; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.EntityScan; /** * Created by Abderrahmen on 08/06/2020. */ @SpringBootApplication @EnableBatchProcessing(modular = false) @EntityScan("io.spring.batch.domain") public class SpringQuartzIntegration { public static void main(String[] args) { SpringApplication.run(SpringQuartzIntegration.class, args); } }
The application.properties file is the following :
application.properties
spring.datasource.url=jdbc:h2:file:~/spring-boot-h2-db;DATABASE_TO_UPPER=false spring.datasource.driverClassName=org.h2.Driver spring.datasource.username=ha spring.datasource.password=ha spring.h2.console.enabled=true spring.jpa.show-sql=true spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect #spring.batch.job.enabled =false spring.h2.console.path=/h2 spring.jpa.database-platform=org.hibernate.dialect.H2Dialect spring.jpa.generate-ddl=true spring.jpa.hibernate.ddl-auto=update logging.level.org.hibernate.SQL=DEBUG logging.level.org.hibernate.type.descriptor.sql.BasicBinder=TRACE spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl spring.application.name=batch purge logging.level.org.springframework.cloud.task=DEBUG scheduler.enabled=true cronExpression=*/10 * * * * ? * # Spring Batch properties spring.batch.job.enabled=false #Quartz #Quartz persistent jobStore config org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix=QRTZ_ org.quartz.jobStore.dataSource=myDS #org.quartz.jobStore.useProperties=false org.quartz.jobStore.isClustered=false #Quartz dataSource org.quartz.dataSource.myDS.driver=org.h2.Driver #org.quartz.dataSource.myDS.URL=jdbc:h2:file:~/h2/testdb;INIT=RUNSCRIPT FROM 'classpath:schema-quartz.sql' #org.quartz.dataSource.myDS.URL=jdbc:h2:file:~/spring-boot-h2-db;DB_CLOSE_ON_EXIT=FALSE;MODE=PostgreSQL org.quartz.dataSource.myDS.URL=jdbc:h2:file:~/h2/testdb;INIT=RUNSCRIPT FROM 'classpath:schema-quartz.sql' org.quartz.dataSource.myDS.user=ha org.quartz.dataSource.myDS.password =ha org.quartz.dataSource.myDS.maxConnections=5 org.quartz.dataSource.myDS.validationQuery=select 1 spring.quartz.jdbc.comment-prefix=# #spring.quartz.jdbc.initialize-schema = never spring.quartz.job-store-type= jdbc org.quartz.jobStore.useProperties= false
Here the metadata tables of Spring batch are stored in a H2 database presented by the file spring-boot-h2-db
While the metadata of Quartz are written in another H2
database stored in the file testdb.
The file data.sql is used to insert data in the CUSTOMER_INPUT table when the app starts :
data.sql
INSERT INTO CUSTOMER_INPUT (FIRSTNAME,LASTNAME) VALUES ('Reed','Edwards');
The file schema.sql contain the creation script of the CUSTOMER_INPUT and the CUSTOMER_OUTPUT tables.
Schema.sql
DROP TABLE IF EXISTS CUSTOMER_INPUT; DROP TABLE IF EXISTS CUSTOMER_OUTPUT; CREATE TABLE CUSTOMER_INPUT ( ID INTEGER NOT NULL auto_increment, FIRSTNAME CHARACTER(255) default NULL, LASTNAME CHARACTER(255) default NULL, PRIMARY KEY (ID) ); CREATE TABLE CUSTOMER_OUTPUT ( ID INTEGER NOT NULL auto_increment, FIRSTNAME CHARACTER(255) default NULL, LASTNAME CHARACTER(255) default NULL, PRIMARY KEY (ID) );
The file schema-quartz.sql contain the creation script of
the metadata tables of Quartz
schema-quartz.sql
The pom.xml is the following :
Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.spring.batch</groupId>
<artifactId>batch-integration</artifactId>
<packaging>jar</packaging>
<version>2.3.0-SNAPSHOT</version>
<description>Batch Integration</description>
<name>Batch Quartz integration</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.5.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Here we arrived at the end of the spring boot 2, Spring
batch and Quartz integration tutorial.
In order to check our core java tutorial, please like our howto program facebook page and subscribe to our newsletter to get our latest
tutorials.
To download the project of this tutorial, please check my Github.
Post a Comment