Thursday 11 October 2012

Spring Batch 2.0

Many applications within the enterprise domain require bulk processing to perform business operations in mission critical environments. These business operations include automated, complex processing of large volumes of information that is most efficiently processed without user interaction. These operations typically include time based events (e.g. month-end calculations, notices or correspondence), periodic application of complex business rules processed repetitively across very large data sets (e.g. Insurance benefit determination or rate adjustments), or the integration of information that is received from internal and external systems that typically requires formatting, validation and processing in a transactional manner into the system of record. Batch processing is used to process billions of transactions every day for enterprises.

What's New in Spring Batch 2.0

The Spring Batch 2.0 release has six major themes:
  • Java 5
  • Non Sequential Step Execution
  • Chunk oriented processing
  • Meta Data enhancements
  • Scalability
  • Configuration

The Domain Language of Batch

 Job

A Job is an entity that encapsulates an entire batch process. As is common with other Spring projects, a Job will be wired together via an XML configuration file. This file may be referred to as the "job configuration". However, Job is just the top of an overall hierarchy:


JobInstance

 A JobInstance refers to the concept of a logical job run.

JobParameters

Having discussed JobInstance and how it differs from Job, the natural question to ask is: "how is one JobInstance distinguished from another?" The answer is: JobParameters. JobParameters is a set of parameters used to start a batch job.

JobExecution

A JobExecution refers to the technical concept of a single attempt to run a Job

Step

A Step is a domain object that encapsulates an independent, sequential phase of a batch job. Therefore, every Job is composed entirely of one or more steps. A Step contains all of the information necessary to define and control the actual batch processing.







StepExecution

 A StepExecution represents a single attempt to execute a Step.  

ExecutionContext

 An ExecutionContext represents a collection of key/value pairs that are persisted and controlled by the framework in order to allow developers a place to store persistent state that is scoped to a StepExecution or JobExecution.

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
 

JobRepository

JobRepository is the persistence mechanism for all of the Stereotypes mentioned above. It provides CRUD operations for JobLauncher, Job, and Step implementations. When a Job is first launched, a JobExecution is obtained from the repository, and during the course of execution StepExecution and JobExecution implementations are persisted by passing them to the repository:

 JobLauncher

 JobLauncher represents a simple interface for launching a Job with a given set of JobParameters:

Item Reader

ItemReader is an abstraction that represents the retrieval of input for a Step, one item at a time. When the ItemReader has exhausted the items it can provide, it will indicate this by returning null.

Item Writer

ItemWriter is an abstraction that represents the output of a Step, one batch or chunk of items at a time.

Item Processor

ItemProcessor is an abstraction that represents the business processing of an item. While the ItemReader reads one item, and the ItemWriter writes them, the ItemProcessor provides access to transform or apply other business processing. 

Configuring and Running a Job

Lets see  small application which uses batch technology pratically.

Here is our applicationContext.xml:

  
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/batch
       http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/aop
   http://www.springframework.org/schema/aop/spring-aop.xsd"
       xmlns:j="http://www.springframework.org/schema/batch">



 <bean id="jobListener" class="com.opensourzesupport.batch.job.listeners.JobListener" />
 <bean id="dataReader" class="com.opensourzesupport.batch.job.CustomReader" />
 <bean id="dataWriter" class="com.opensourzesupport.batch.job.CustomWriter" />
 <bean id="dataProcessor" class="com.opensourzesupport.batch.job.CustomProcessor" />

 <!-- Billing job flow -->
 <job id="job_dataproc" xmlns="http://www.springframework.org/schema/batch"
  restartable="true">
  <step id="step1">
   <tasklet task-executor="pooledTaskExecutor" throttle-limit="1">
    <chunk reader="dataReader" writer="dataWriter" processor="dataProcessor"
     commit-interval="2" />
   </tasklet>
  </step>
  <listeners>
   <listener ref="jobListener" />
  </listeners>
 </job>

 <!-- pooledTaskExecutor is using in chunk processing other wise sepearte 
  thread will spone on each chunk procesing -->
 <bean id="pooledTaskExecutor"
  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  <property name="corePoolSize" value="2" />
  <property name="maxPoolSize" value="10" />
  <property name="queueCapacity" value="25" />
 </bean>

 <j:job-repository id="jobRepository" data-source="applicationSettingsDBdataSource"
  transaction-manager="transactionManager" isolation-level-for-create="SERIALIZABLE"
  table-prefix="BATCH_" />
 <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" lazy-init="true">
    </bean>
 <!-- Optional A JobRegistry (and its parent interface JobLocator) is not 
  mandatory, but it can be useful if you want to keep track of which jobs are 
  available in the context. It is also useful for collecting jobs centrally -->
 <bean id="jobRegistry"
  class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

 <!-- Optional : Used only if JobRegistry is used This is a bean post-processor 
  that can register all jobs as they are created -->
 <bean id="jobRegistryBeanPostProcessor"
  class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
  <property name="jobRegistry" ref="jobRegistry" />
 </bean>


 <!-- Following are related to asynchronized job lanching -->
 <bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
 </bean>
 <bean id="asyncJobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
  <property name="jobRepository" ref="jobRepository" />
  <property name="taskExecutor" ref="asyncTaskExecutor" />
 </bean>
 <!-- orderdata mothly , adhoc and billing service are using asyncJobOperator -->
 <bean id="asyncJobOperator"
  class="org.springframework.batch.core.launch.support.SimpleJobOperator">
  <property name="jobExplorer">
   <bean
    class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
    <property name="dataSource" ref="applicationSettingsDBdataSource" />
   </bean>
  </property>
  <property name="jobRepository" ref="jobRepository" />
  <property name="jobRegistry" ref="jobRegistry" />
  <property name="jobLauncher" ref="asyncJobLauncher" />
 </bean>
 <!-- Ends asynchronized job lanching -->


 <!-- Following are related to synchronized job lanching , currently no job 
  using syncJobOperator -->
 <bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor">
 </bean>
 <bean id="syncJobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
  <property name="jobRepository" ref="jobRepository" />
  <property name="taskExecutor" ref="syncTaskExecutor" />
 </bean>
 <bean id="syncJobOperator"
  class="org.springframework.batch.core.launch.support.SimpleJobOperator">
  <property name="jobExplorer">
   <bean
    class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
    <property name="dataSource" ref="applicationSettingsDBdataSource" />
   </bean>
  </property>
  <property name="jobRepository" ref="jobRepository" />
  <property name="jobRegistry" ref="jobRegistry" />
  <property name="jobLauncher" ref="syncJobLauncher" />
 </bean>
 <bean id="applicationSettingsDBdataSource" class="oracle.jdbc.pool.OracleDataSource"
  destroy-method="close">
  <property name="URL" value="oracle.url" />
  <property name="user" value="username" />
  <property name="password" value="password" />

 </bean>
 
 <bean class="com.opensourzesupport.batch.RunApp" init-method="init"   >
 <property name="jobOperator" ref="syncJobOperator" />
 </bean>
</beans>

  CustomReader.java

 
package com.opensourzesupport.batch.job;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import com.opensourzesupport.batch.job.model.DataWrapper;
/**
 * 
 * @author Prasobh.K
 *
 */
public class CustomReader implements ItemReader> {

    private int count = 0;
 @Override
 public List read() throws Exception, UnexpectedInputException,
   ParseException {
  count++;
  System.out.println("In CustomReader Count is : "+count);   
  List retList = null;
  if(count <3 data-blogger-escaped-arraylist="arraylist" data-blogger-escaped-atawrapper="atawrapper" data-blogger-escaped-retlist="new">();
   DataWrapper dataWrapper = new DataWrapper();
   dataWrapper.setCustId("" + System.currentTimeMillis());
   retList.add(dataWrapper);
   DataWrapper dataWrapper1 = new DataWrapper();
   dataWrapper1.setCustId("" + System.currentTimeMillis());
   retList.add(dataWrapper1);
  }
  //else will return null to end reading data 
  return retList;
 }

}

CustomProcessor.java

package com.opensourzesupport.batch.job;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;
import com.opensourzesupport.batch.job.model.DataWrapper;

/**
 * 
 * @author Prasobh.K
 * 
 */
public class CustomProcessor implements
  ItemProcessor, Map> {

 @Override
 public Map process(List arg0) throws Exception {  

  Map map = new HashMap();
  
  int i = 0;
  for (DataWrapper dataWrapper : arg0) {
   System.out.println("In processor : " + dataWrapper.getCustId());
   map.put("custid" + i, dataWrapper.getCustId());
  }
  return map;
 }

}

CustomWriter.java

package com.opensourzesupport.batch.job;

import java.util.List;
import java.util.Map;

import org.springframework.batch.item.ItemWriter;

/**
 * 
 * @author Prasobh.K
 * 
 */
public class CustomWriter implements ItemWriter> {

 @Override
 public void write(List> arg0)
   throws Exception {
  System.out.println("In Customwriter : " + arg0);
 }

}

Main.java

package com.opensourzesupport.batch;

/**
 * 
 * @author Prasobh.K
 * 
 */
public class Main {

 public static void main(String[] a) {
  String[] ar = { "resources/applicationContext.xml" };
  try {
   org.springframework.batch.core.launch.support.JobRegistryBackgroundJobRunner
     .main(ar);

  } catch (Exception e) {
   e.printStackTrace();
  }

 }
}
 

RunApp.java

package com.opensourzesupport.batch;

import org.springframework.batch.core.launch.JobOperator;

/**
 * 
 * @author Prasobh.K
 * 
 */
public class RunApp {

 public JobOperator jobOperator;

 public void setJobOperator(JobOperator jobOperator) {
  this.jobOperator = jobOperator;
 }

 public void init() {
  String jobParameters = "01253 =" + System.currentTimeMillis();
  System.out.print("In RunApp init");

  try {
   jobOperator.start("job_dataproc", jobParameters);
  } catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

 }
}

Output :

Before job 2950
In CustomReader Count is : 1
In CustomReader Count is : 2
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In processor : 1350016945897
In Customwriter : [{custid0=1350016945897}, {custid0=1350016945897}]
In CustomReader Count is : 3
In CustomReader Count is : 4
After job 2950

 

CommitInterval decides how many times item should be read.if reader return null, reading will stop.
Below is a code representation of the same concepts shown above:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);

  Jar Used


 






 

No comments:

Post a Comment