Many applications within the enterprise domain require bulk processing
to perform business operations in mission critical environments. These
business operations include automated, complex processing of large volumes
of information that is most efficiently processed without user interaction.
These operations typically include time based events (e.g. month-end
calculations, notices or correspondence), periodic application of complex
business rules processed repetitively across very large data sets (e.g.
Insurance benefit determination or rate adjustments), or the integration of
information that is received from internal and external systems that
typically requires formatting, validation and processing in a transactional
manner into the system of record. Batch processing is used to process
billions of transactions every day for enterprises.
A
What's New in Spring Batch 2.0
The Spring Batch 2.0 release has six major themes:- Java 5
- Non Sequential Step Execution
- Chunk oriented processing
- Meta Data enhancements
- Scalability
- Configuration
The Domain Language of Batch
Job
A
Job
is an entity that encapsulates an
entire batch process. As is common with other Spring projects, a
Job
will be wired together via an XML configuration
file. This file may be referred to as the "job configuration". However,
Job
is just the top of an overall hierarchy:JobInstance
A
JobInstance
refers to the concept of a
logical job run. JobParameters
Having discussed
JobInstance
and how it
differs from Job
, the natural question to ask is:
"how is one JobInstance
distinguished from
another?" The answer is: JobParameters
.
JobParameters
is a set of parameters used to
start a batch job. JobExecution
A
JobExecution
refers to the technical
concept of a single attempt to run a Job
. Step
A
Step
is a domain object that encapsulates
an independent, sequential phase of a batch job. Therefore, every
Job
is composed entirely of one or more steps. A
Step
contains all of the information necessary to
define and control the actual batch processing. StepExecution
A StepExecution
represents a single attempt
to execute a Step
.
ExecutionContext
An
ExecutionContext
represents a collection
of key/value pairs that are persisted and controlled by the framework in
order to allow developers a place to store persistent state that is scoped
to a StepExecution
or
JobExecution
.ExecutionContext ecStep = stepExecution.getExecutionContext(); ExecutionContext ecJob = jobExecution.getExecutionContext(); //ecStep does not equal ecJob
JobRepository
JobRepository
is the persistence mechanism
for all of the Stereotypes mentioned above. It provides CRUD operations
for JobLauncher
, Job
, and
Step
implementations. When a
Job
is first launched, a
JobExecution
is obtained from the repository, and
during the course of execution StepExecution
and
JobExecution
implementations are persisted by
passing them to the repository:JobLauncher
JobLauncher
represents a simple interface for
launching a Job
with a given set of
JobParameters
: Item Reader
ItemReader
is an abstraction that represents
the retrieval of input for a Step
, one item at a
time. When the ItemReader
has exhausted the items
it can provide, it will indicate this by returning null. Item Writer
ItemWriter
is an abstraction that
represents the output of a Step
, one batch
or chunk of items at a time. Item Processor
ItemProcessor
is an abstraction that
represents the business processing of an item. While the
ItemReader
reads one item, and the
ItemWriter
writes them, the
ItemProcessor
provides access to transform or apply
other business processing. Configuring and Running a Job
Lets see small application which uses batch technology pratically.
Here is our applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd" xmlns:j="http://www.springframework.org/schema/batch"> <bean id="jobListener" class="com.opensourzesupport.batch.job.listeners.JobListener" /> <bean id="dataReader" class="com.opensourzesupport.batch.job.CustomReader" /> <bean id="dataWriter" class="com.opensourzesupport.batch.job.CustomWriter" /> <bean id="dataProcessor" class="com.opensourzesupport.batch.job.CustomProcessor" /> <!-- Billing job flow --> <job id="job_dataproc" xmlns="http://www.springframework.org/schema/batch" restartable="true"> <step id="step1"> <tasklet task-executor="pooledTaskExecutor" throttle-limit="1"> <chunk reader="dataReader" writer="dataWriter" processor="dataProcessor" commit-interval="2" /> </tasklet> </step> <listeners> <listener ref="jobListener" /> </listeners> </job> <!-- pooledTaskExecutor is using in chunk processing other wise sepearte thread will spone on each chunk procesing --> <bean id="pooledTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="2" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="25" /> </bean> <j:job-repository id="jobRepository" data-source="applicationSettingsDBdataSource" transaction-manager="transactionManager" isolation-level-for-create="SERIALIZABLE" table-prefix="BATCH_" /> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" lazy-init="true"> </bean> <!-- Optional A JobRegistry (and its parent interface JobLocator) is not mandatory, but it can be useful if you want to keep track of which jobs are available in the context. It is also useful for collecting jobs centrally --> <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" /> <!-- Optional : Used only if JobRegistry is used This is a bean post-processor that can register all jobs as they are created --> <bean id="jobRegistryBeanPostProcessor" class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor"> <property name="jobRegistry" ref="jobRegistry" /> </bean> <!-- Following are related to asynchronized job lanching --> <bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"> </bean> <bean id="asyncJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> <property name="taskExecutor" ref="asyncTaskExecutor" /> </bean> <!-- orderdata mothly , adhoc and billing service are using asyncJobOperator --> <bean id="asyncJobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator"> <property name="jobExplorer"> <bean class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"> <property name="dataSource" ref="applicationSettingsDBdataSource" /> </bean> </property> <property name="jobRepository" ref="jobRepository" /> <property name="jobRegistry" ref="jobRegistry" /> <property name="jobLauncher" ref="asyncJobLauncher" /> </bean> <!-- Ends asynchronized job lanching --> <!-- Following are related to synchronized job lanching , currently no job using syncJobOperator --> <bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor"> </bean> <bean id="syncJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> <property name="taskExecutor" ref="syncTaskExecutor" /> </bean> <bean id="syncJobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator"> <property name="jobExplorer"> <bean class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"> <property name="dataSource" ref="applicationSettingsDBdataSource" /> </bean> </property> <property name="jobRepository" ref="jobRepository" /> <property name="jobRegistry" ref="jobRegistry" /> <property name="jobLauncher" ref="syncJobLauncher" /> </bean> <bean id="applicationSettingsDBdataSource" class="oracle.jdbc.pool.OracleDataSource" destroy-method="close"> <property name="URL" value="oracle.url" /> <property name="user" value="username" /> <property name="password" value="password" /> </bean> <bean class="com.opensourzesupport.batch.RunApp" init-method="init" > <property name="jobOperator" ref="syncJobOperator" /> </bean> </beans>
CustomReader.java
package com.opensourzesupport.batch.job; import java.util.ArrayList; import java.util.List; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import com.opensourzesupport.batch.job.model.DataWrapper; /** * * @author Prasobh.K * */ public class CustomReader implements ItemReader> { private int count = 0; @Override public List
read() throws Exception, UnexpectedInputException, ParseException { count++; System.out.println("In CustomReader Count is : "+count); List retList = null; if(count <3 data-blogger-escaped-arraylist="arraylist" data-blogger-escaped-atawrapper="atawrapper" data-blogger-escaped-retlist="new">(); DataWrapper dataWrapper = new DataWrapper(); dataWrapper.setCustId("" + System.currentTimeMillis()); retList.add(dataWrapper); DataWrapper dataWrapper1 = new DataWrapper(); dataWrapper1.setCustId("" + System.currentTimeMillis()); retList.add(dataWrapper1); } //else will return null to end reading data return retList; } }
CustomProcessor.java
package com.opensourzesupport.batch.job; import java.util.HashMap; import java.util.List; import java.util.Map; import org.springframework.batch.item.ItemProcessor; import com.opensourzesupport.batch.job.model.DataWrapper; /** * * @author Prasobh.K * */ public class CustomProcessor implements ItemProcessor, Map
> { @Override public Map process(List arg0) throws Exception { Map map = new HashMap (); int i = 0; for (DataWrapper dataWrapper : arg0) { System.out.println("In processor : " + dataWrapper.getCustId()); map.put("custid" + i, dataWrapper.getCustId()); } return map; } }
CustomWriter.java
package com.opensourzesupport.batch.job; import java.util.List; import java.util.Map; import org.springframework.batch.item.ItemWriter; /** * * @author Prasobh.K * */ public class CustomWriter implements ItemWriter
Main.java
package com.opensourzesupport.batch; /** * * @author Prasobh.K * */ public class Main { public static void main(String[] a) { String[] ar = { "resources/applicationContext.xml" }; try { org.springframework.batch.core.launch.support.JobRegistryBackgroundJobRunner .main(ar); } catch (Exception e) { e.printStackTrace(); } } }
RunApp.java
package com.opensourzesupport.batch; import org.springframework.batch.core.launch.JobOperator; /** * * @author Prasobh.K * */ public class RunApp { public JobOperator jobOperator; public void setJobOperator(JobOperator jobOperator) { this.jobOperator = jobOperator; } public void init() { String jobParameters = "01253 =" + System.currentTimeMillis(); System.out.print("In RunApp init"); try { jobOperator.start("job_dataproc", jobParameters); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
Output :
Before job 2950
In CustomReader Count is : 1
In CustomReader Count is : 2
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In Customwriter : [{custid0=1350016945897}, {custid0=1350016945897}]
In CustomReader Count is : 3
In CustomReader Count is : 4
After job 2950
In CustomReader Count is : 1
In CustomReader Count is : 2
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In Customwriter : [{custid0=1350016945897}, {custid0=1350016945897}]
In CustomReader Count is : 3
In CustomReader Count is : 4
After job 2950
CommitInterval decides how many times item should be read.if reader return null, reading will stop.
Below is a code representation of the same concepts shown
above:
List items = new Arraylist(); for(int i = 0; i < commitInterval; i++){ Object item = itemReader.read() Object processedItem = itemProcessor.process(item); items.add(processedItem); } itemWriter.write(items);
No comments:
Post a Comment