1. Task 기반 배치와 Chunk 기반 배치
1.1 Task 기반 배치
배치 처리 과정이 비교적 쉽게 사용할수 있다.
대용량 처리를 할 경우에는 더 복잡할 수 있어 하나의 큰 덩어리를 여러개로 처리 하기 부적합
@Configuration
@Slf4j
public class TaskProcessConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
public TaskProcessConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Job taskProcessJob(){
return jobBuilderFactory.get("taskProcessJob")
.incrementer(new RunIdIncrementer())
.start(this.teskBaseStep())
.build();
}
@Bean
public Step teskBaseStep(){
return stepBuilderFactory.get("teskBaseStep")
.tasklet(this.tesklet())
.build();
}
private Tasklet tesklet(){
return (contribution, chunkContext) -> {
List<String> items = getItems(100);
log.info("task item size : {}", items.size());
//tesklet 종료
return RepeatStatus.FINISHED;
};
}
//단순 아이템을 생성하는 로직
private List<String> getItems(int count){
List<String> temp = new ArrayList<>();
for(int i =0; i < count; i++){
temp.add(i + "hello");
}
return temp;
}
}
1.1.1 Task 기반 배치를 수동으로 나누어 처리하기
private Tasklet tesklet(){
return (contribution, chunkContext) -> {
List<String> items = getItems(100);
//log.info("task item size : {}", items.size());
//Task에서 Chunk처럼 데이터를 쪼개서 실행시켜 보면
StepExecution stepExecution = contribution.getStepExecution();
int chunkSize = 10;
int formIndex = stepExecution.getReadCount();
int toIndex = formIndex + chunkSize;
if(formIndex >= items.size()){
return RepeatStatus.FINISHED;
}
List<String> sub = items.subList(formIndex, toIndex);
log.info("task item size : {}", sub.size());
//전체 리스트에서 어디 까지 읽었는지 수동으로 갱신 시켜줌
stepExecution.setReadCount(toIndex);
//Tesk를 반복하라는 명령어
return RepeatStatus.CONTINUABLE;
};
}
1.2 Chunk 기반 배치
itemReader, itemProcesser, itemWriter로 구성되어 있고 대용량 처리에 적합하다
Chunk - 10000개의 덩어리를 1000개씩 10번에 나누어 수행하도록 설정 가능
Task - 10000개의 데이터를 한번에 수행/ 또는 수동으로 나누어야 함
@Configuration
@Slf4j
public class ChunkProcessConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
public ChunkProcessConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Job chunkProcessJob(){
return jobBuilderFactory.get("chunkProcessJob")
.incrementer(new RunIdIncrementer())
.start(this.chunkBaseStep())
.build();
}
@Bean
public Step chunkBaseStep(){
return stepBuilderFactory.get("chunkBaseStep")
// 100개의 데이터를 10개씩 나누겠다는 선언
.<String, String>chunk(10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
private ItemReader<String> itemReader() {
// 스프링 배치의 기본 ItemReader
// getItems 에서 100개의 아이템이 저장된 리스트를 받음
return new ListItemReader<>(getItems(100));
}
// 리더에서 읽은 아이템에 "Spring batch" 라는 붙여준다.
// ItemProcessor 데이터가 null로 반환 하면 해당 아이템은 ItemWriter로 전달 불가
// 예제에서는 모든 데이터를 넘김
private ItemProcessor< String, String> itemProcessor() {
return item -> item +",Spring batch";
}
private ItemWriter<String> itemWriter() {
//return items -> log.info("chunk item size {}", items.size());
//ItemProcessor를 통하여 변경된 문자열을 출력
return items -> items.forEach(log::info);
}
//단순 아이템을 생성하는 로직
private List<String> getItems(int count){
List<String> temp = new ArrayList<>();
for(int i =0; i < count; i++){
temp.add(i + "hello");
}
return temp;
}
ItemReader에서 null을 반환할때 까지 Step 반복 -> 처리할 데이터가 없다는 의미
ItemReader와 ItemProcesser는 아이템을 1개 씩 받아서 처리하지만
ItemWriter는 아이템을 리스트로 받아서 처리
<INPUT, OUTPUT>chunk(int)
예제 코드의 <String, String>chunk(10) 부분
reader에서 INPUT 을 return 한다
processor에서 INPUT을 받아 processing 후 OUPUT을 return (INPUT, OUTPUT은 같은 타입일 수 있음)
writer에서 List
2. JobParameters
배치를 실행에 필요한 값을 parameter를 통해 외부에서 주입
배치 실행 시 조금 더 유연한 세팅을 위하여 사용
2.1 JobParameters 객체를 활용하여 데이터를 전달하는 방법
String parameter = jobParameters.getString(key, defaultValue);
1.1.1 예제에서 chuncksize를 받은 방법을 JobParameters를 활용하도록 변경
private Tasklet tesklet(){
return (contribution, chunkContext) -> {
List<String> items = getItems(100);
StepExecution stepExecution = contribution.getStepExecution();
//stepExecution에서 JobParameters를 호출
JobParameters jobParameters = stepExecution.getJobParameters();
//JobParameters에서 chunkSize라는 이름의 변수값을 받음 / 없으면 10으로 세팅
int chunkSize = Integer.parseInt(jobParameters.getString("chunkSize", "10"));
int formIndex = stepExecution.getReadCount();
int toIndex = formIndex + chunkSize;
if(formIndex >= items.size()){
return RepeatStatus.FINISHED;
}
List<String> sub = items.subList(formIndex, toIndex);
log.info("task item size : {}", sub.size());
//전체 리스트에서 어디 까지 읽었는지 수동으로 갱신 시켜줌
stepExecution.setReadCount(toIndex);
//Tesk를 반복하라는 명령어
return RepeatStatus.CONTINUABLE;
};
}
인텔리제이의 Application 창에서 chunkSize 변수를 추가한다.
2.2 Spring EL(Expression Language)로 접근
@Value(“#{jobParameters[key]}”) 예제 1.2 에서 chunkBaseStep를 변경
@Bean
@JobScope
public Step chunkBaseStep(@Value("#{jobParameters[chunkSize]}") String chunkSize){
return stepBuilderFactory.get("chunkBaseStep")
.<String, String>chunk(StringUtils.hasText(chunkSize) ? Integer.parseInt(chunkSize) : 10 )
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Value 가 lombok의 value가 아니라 org.springframework.beans.factory.annotation.Value 라는 것에 주의
위 예제와 동일하게 chunkSize 변수가 있다면 해당 데이터로 없다면 10을 기본 값으로 하도록 생성
chunkBaseStep의 시그니처가 변경되었음으로 실행을 위해 chunkProcessJob에서 chunkBaseStep을 호출하는 것도 변경
@Bean
public Job chunkProcessJob(){
return jobBuilderFactory.get("chunkProcessJob")
.incrementer(new RunIdIncrementer())
// 파라메터의 null이 들어가도 환경 변수에 설정된 데이터를 받아온다.
.start(this.chunkBaseStep(null))
.build();
}
2.3 JobScope와 StapScope의 이해
@Scope는 어떤 시점에 bean을 생성/소멸 시킬 지 bean의 lifecycle을 설정
스프링에서 @Scope는 싱글톤으로 구현되어 있음
- @JobScope는 job 실행 시점에 생성/소멸 -> Step에 선언
- @StepScope는 step 실행 시점에 생성/소멸 -> Tasklet, Chunk(ItemReader, ItemProcessor, ItemWriter) 에 선언
에제의 ItemReader, ItemProcessor, ItemWriter는 @Bean 선언이 없었지만
@StepScope를 사용하기 위해서는 @Bean으로 설정이 필요
(@StepScope의 라이프 사이클이 @Bean을 따르기 때문 -> 데이터 및 설정을 스프링 기반 시스템에 의존한다.)
Spring의 @Scope과 같은 것 이기 때문에 @Scope의 속성중 ScopeName이 있는데 아래와 같이 선언하면 기능이 동일하게 작동
@Scope(“job”) -> @JobScope / @Scope(“step”) -> @StepScope
Job과 Step 라이프사이클에 의해 생성되기 때문에 Thread safe하게 작동
@Value(“#{jobParameters[key]}”)를 사용하기 위해 @JobScope와 @StepScope는 필수
2.1 예제에서 JobParameters 사용부분을 @StepScope 사용으로 변경해보면
@Bean
@StepScope
//StepScope를 사용하기 위해 Bean으로 등록한다.
//Bean으로 등록을 하게 되면 private 선언을 할수 없기 때문에 public으로 변경
public Tasklet tesklet(){
return (contribution, chunkContext) -> {
List<String> items = getItems(100);
StepExecution stepExecution = contribution.getStepExecution();
//stepExecution에서 JobParameters를 호출
JobParameters jobParameters = stepExecution.getJobParameters();
//JobParameters에서 chunkSize라는 이름의 변수값을 받음 / 없으면 10으로 세팅
int chunkSize = Integer.parseInt(jobParameters.getString("chunkSize", "10"));
int formIndex = stepExecution.getReadCount();
int toIndex = formIndex + chunkSize;
if(formIndex >= items.size()){
return RepeatStatus.FINISHED;
}
List<String> sub = items.subList(formIndex, toIndex);
log.info("task item size : {}", sub.size());
//전체 리스트에서 어디 까지 읽었는지 수동으로 갱신 시켜줌
stepExecution.setReadCount(toIndex);
//Tesk를 반복하라는 명령어
return RepeatStatus.CONTINUABLE;
};
}
tesklet의 시그니처가 변경되었기 때문에 아래와 같이 teskBaseStep을 변경
@Bean
public Step teskBaseStep(){
return stepBuilderFactory.get("teskBaseStep")
//tasklet이 bean으로 생성되었으므로 null을 넣더라도 스프링 라이프 사이클에서 파라메터를 넣어줌
.tasklet(this.tesklet(null))
.build();
}