Many applications within the enterprise domain require bulk processing
to perform business operations in mission critical environments. These
business operations include automated, complex processing of large volumes
of information that is most efficiently processed without user interaction.
These operations typically include time based events (e.g. month-end
calculations, notices or correspondence), periodic application of complex
business rules processed repetitively across very large data sets (e.g.
Insurance benefit determination or rate adjustments), or the integration of
information that is received from internal and external systems that
typically requires formatting, validation and processing in a transactional
manner into the system of record. Batch processing is used to process
billions of transactions every day for enterprises.
A
What's New in Spring Batch 2.0
The Spring Batch 2.0 release has six major themes:- Java 5
- Non Sequential Step Execution
- Chunk oriented processing
- Meta Data enhancements
- Scalability
- Configuration
The Domain Language of Batch
Job
A
Job is an entity that encapsulates an
entire batch process. As is common with other Spring projects, a
Job will be wired together via an XML configuration
file. This file may be referred to as the "job configuration". However,
Job is just the top of an overall hierarchy:JobInstance
A
JobInstance refers to the concept of a
logical job run. JobParameters
Having discussed
JobInstance and how it
differs from Job, the natural question to ask is:
"how is one JobInstance distinguished from
another?" The answer is: JobParameters.
JobParameters is a set of parameters used to
start a batch job. JobExecution
A
JobExecution refers to the technical
concept of a single attempt to run a Job. Step
A
Step is a domain object that encapsulates
an independent, sequential phase of a batch job. Therefore, every
Job is composed entirely of one or more steps. A
Step contains all of the information necessary to
define and control the actual batch processing. StepExecution
A StepExecution represents a single attempt
to execute a Step.
ExecutionContext
An
ExecutionContext represents a collection
of key/value pairs that are persisted and controlled by the framework in
order to allow developers a place to store persistent state that is scoped
to a StepExecution or
JobExecution.ExecutionContext ecStep = stepExecution.getExecutionContext(); ExecutionContext ecJob = jobExecution.getExecutionContext(); //ecStep does not equal ecJob
JobRepository
JobRepository is the persistence mechanism
for all of the Stereotypes mentioned above. It provides CRUD operations
for JobLauncher, Job, and
Step implementations. When a
Job is first launched, a
JobExecution is obtained from the repository, and
during the course of execution StepExecution and
JobExecution implementations are persisted by
passing them to the repository:JobLauncher
JobLauncher represents a simple interface for
launching a Job with a given set of
JobParameters: Item Reader
ItemReader is an abstraction that represents
the retrieval of input for a Step, one item at a
time. When the ItemReader has exhausted the items
it can provide, it will indicate this by returning null. Item Writer
ItemWriter is an abstraction that
represents the output of a Step, one batch
or chunk of items at a time. Item Processor
ItemProcessor is an abstraction that
represents the business processing of an item. While the
ItemReader reads one item, and the
ItemWriter writes them, the
ItemProcessor provides access to transform or apply
other business processing. Configuring and Running a Job
Lets see small application which uses batch technology pratically.
Here is our applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd"
xmlns:j="http://www.springframework.org/schema/batch">
<bean id="jobListener" class="com.opensourzesupport.batch.job.listeners.JobListener" />
<bean id="dataReader" class="com.opensourzesupport.batch.job.CustomReader" />
<bean id="dataWriter" class="com.opensourzesupport.batch.job.CustomWriter" />
<bean id="dataProcessor" class="com.opensourzesupport.batch.job.CustomProcessor" />
<!-- Billing job flow -->
<job id="job_dataproc" xmlns="http://www.springframework.org/schema/batch"
restartable="true">
<step id="step1">
<tasklet task-executor="pooledTaskExecutor" throttle-limit="1">
<chunk reader="dataReader" writer="dataWriter" processor="dataProcessor"
commit-interval="2" />
</tasklet>
</step>
<listeners>
<listener ref="jobListener" />
</listeners>
</job>
<!-- pooledTaskExecutor is using in chunk processing other wise sepearte
thread will spone on each chunk procesing -->
<bean id="pooledTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="2" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="25" />
</bean>
<j:job-repository id="jobRepository" data-source="applicationSettingsDBdataSource"
transaction-manager="transactionManager" isolation-level-for-create="SERIALIZABLE"
table-prefix="BATCH_" />
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" lazy-init="true">
</bean>
<!-- Optional A JobRegistry (and its parent interface JobLocator) is not
mandatory, but it can be useful if you want to keep track of which jobs are
available in the context. It is also useful for collecting jobs centrally -->
<bean id="jobRegistry"
class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
<!-- Optional : Used only if JobRegistry is used This is a bean post-processor
that can register all jobs as they are created -->
<bean id="jobRegistryBeanPostProcessor"
class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
<!-- Following are related to asynchronized job lanching -->
<bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
</bean>
<bean id="asyncJobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="asyncTaskExecutor" />
</bean>
<!-- orderdata mothly , adhoc and billing service are using asyncJobOperator -->
<bean id="asyncJobOperator"
class="org.springframework.batch.core.launch.support.SimpleJobOperator">
<property name="jobExplorer">
<bean
class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
<property name="dataSource" ref="applicationSettingsDBdataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="asyncJobLauncher" />
</bean>
<!-- Ends asynchronized job lanching -->
<!-- Following are related to synchronized job lanching , currently no job
using syncJobOperator -->
<bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor">
</bean>
<bean id="syncJobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="syncTaskExecutor" />
</bean>
<bean id="syncJobOperator"
class="org.springframework.batch.core.launch.support.SimpleJobOperator">
<property name="jobExplorer">
<bean
class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
<property name="dataSource" ref="applicationSettingsDBdataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="syncJobLauncher" />
</bean>
<bean id="applicationSettingsDBdataSource" class="oracle.jdbc.pool.OracleDataSource"
destroy-method="close">
<property name="URL" value="oracle.url" />
<property name="user" value="username" />
<property name="password" value="password" />
</bean>
<bean class="com.opensourzesupport.batch.RunApp" init-method="init" >
<property name="jobOperator" ref="syncJobOperator" />
</bean>
</beans>
CustomReader.java
package com.opensourzesupport.batch.job; import java.util.ArrayList; import java.util.List; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import com.opensourzesupport.batch.job.model.DataWrapper; /** * * @author Prasobh.K * */ public class CustomReader implements ItemReader> { private int count = 0; @Override public List
read() throws Exception, UnexpectedInputException, ParseException { count++; System.out.println("In CustomReader Count is : "+count); List retList = null; if(count <3 data-blogger-escaped-arraylist="arraylist" data-blogger-escaped-atawrapper="atawrapper" data-blogger-escaped-retlist="new">(); DataWrapper dataWrapper = new DataWrapper(); dataWrapper.setCustId("" + System.currentTimeMillis()); retList.add(dataWrapper); DataWrapper dataWrapper1 = new DataWrapper(); dataWrapper1.setCustId("" + System.currentTimeMillis()); retList.add(dataWrapper1); } //else will return null to end reading data return retList; } }
CustomProcessor.java
package com.opensourzesupport.batch.job; import java.util.HashMap; import java.util.List; import java.util.Map; import org.springframework.batch.item.ItemProcessor; import com.opensourzesupport.batch.job.model.DataWrapper; /** * * @author Prasobh.K * */ public class CustomProcessor implements ItemProcessor, Map
> { @Override public Map process(List arg0) throws Exception { Map map = new HashMap (); int i = 0; for (DataWrapper dataWrapper : arg0) { System.out.println("In processor : " + dataWrapper.getCustId()); map.put("custid" + i, dataWrapper.getCustId()); } return map; } }
CustomWriter.java
package com.opensourzesupport.batch.job; import java.util.List; import java.util.Map; import org.springframework.batch.item.ItemWriter; /** * * @author Prasobh.K * */ public class CustomWriter implements ItemWriter
Main.java
package com.opensourzesupport.batch;
/**
*
* @author Prasobh.K
*
*/
public class Main {
public static void main(String[] a) {
String[] ar = { "resources/applicationContext.xml" };
try {
org.springframework.batch.core.launch.support.JobRegistryBackgroundJobRunner
.main(ar);
} catch (Exception e) {
e.printStackTrace();
}
}
}
RunApp.java
package com.opensourzesupport.batch;
import org.springframework.batch.core.launch.JobOperator;
/**
*
* @author Prasobh.K
*
*/
public class RunApp {
public JobOperator jobOperator;
public void setJobOperator(JobOperator jobOperator) {
this.jobOperator = jobOperator;
}
public void init() {
String jobParameters = "01253 =" + System.currentTimeMillis();
System.out.print("In RunApp init");
try {
jobOperator.start("job_dataproc", jobParameters);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Output :
Before job 2950
In CustomReader Count is : 1
In CustomReader Count is : 2
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In Customwriter : [{custid0=1350016945897}, {custid0=1350016945897}]
In CustomReader Count is : 3
In CustomReader Count is : 4
After job 2950
In CustomReader Count is : 1
In CustomReader Count is : 2
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In Customwriter : [{custid0=1350016945897}, {custid0=1350016945897}]
In CustomReader Count is : 3
In CustomReader Count is : 4
After job 2950
CommitInterval decides how many times item should be read.if reader return null, reading will stop.
Below is a code representation of the same concepts shown
above:
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read()
Object processedItem = itemProcessor.process(item);
items.add(processedItem);
}
itemWriter.write(items);




No comments:
Post a Comment