1. ItemReader InterFace 구조
기본 제공되는 ItemReader 구현체가 많음 (file, jdbc, Jpa 등등)
ItemReader 구현체가 없으면 직접 개발
ItemStream은 ExecutionContext로 read, write 정보를 저장
CustomItemReader 구현 예제
public class CustomItemReader<T> implements ItemReader<T> {
//예제에서는 List를 받는 ItemReader로 설정
private final List<T> items;
//생성자를 통하여 필드를 세팅할수 있도록 정의
public CustomItemReader(List<T> items) {
this.items = new ArrayList<>(items);
}
@Override
public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (!items.isEmpty()) {
// remove() 메소드는 0번째 인덱스 반환 후 리스트에서 제거한다.
// ItemReader는 1개씩 데이터를 전달한다.
return items.remove(0);
}
return null;
}
}
위 예제의 CustomItemReader 작동 확인을 위하여 클래스 생성
@Entity
@NoArgsConstructor
@Getter
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
private String name;
private String age;
private String address;
public Person(String name, String age, String address) {
this(0, name, age, address);
}
public Person(int id, String name, String age, String address) {
this.id = id;
this.name = name;
this.age = age;
this.address = address;
}
public boolean isNotEmptyName() {
return Objects.nonNull(this.name) && !name.isEmpty();
}
public Person unknownName() {
this.name = "UNKNOWN";
return this;
}
}
이제 생성한 CustomItemReader를 통하여 Parson 객체를 전달 받고 출력하는 소스코드를 작성한다.
(customItemReaderStep()과 itemReaderJob() 를 설정 )
@Configuration
@Slf4j
public class ItemReaderConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
public ItemReaderConfiguration(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Job itemReaderJob() throws Exception {
return this.jobBuilderFactory.get("itemReaderJob")
.incrementer(new RunIdIncrementer())
.start(this.customItemReaderStep())
.build();
}
@Bean
public Step customItemReaderStep() {
return this.stepBuilderFactory.get("customItemReaderStep")
.<Person, Person>chunk(10)
.reader(new CustomItemReader<>(getItems()))
.writer(itemWriter())
.build();
}
// Person의 name을 ,(콤마) 를 붙여 구분하면서 한줄로 출력
private ItemWriter<Person> itemWriter() {
return items -> log.info(
items.stream()
.map(Person::getName)
.collect(Collectors.joining(", ")));
}
private List<Person> getItems() {
List<Person> items = new ArrayList<>();
for (int i = 0; i < 10; i++) {
items.add(new Person(i + 1, "test name" + i, "test age", "test address"));
}
return items;
}
}
1.1. FlatFileItemReader
파일에 저장된 데이터를 읽어 객체에 매핑 하는 ItemReader
테스트를 위한 csv 파일을 생성 (test.csv 파일을 만든다.)
위에서 생성한 Parson 객체는 그대로 사용한다.
id,이름,나이,거주지
1,이경원,32,인천
2,홍길동,30,서울
3,아무개,25,강원
private FlatFileItemReader<Person> csvFileItemReader() throws Exception {
/** CSV파일을 Parson 객체에 매핑 **/
//파일을 한줄씩 읽을수 있는 LineMapper 설정
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
//파읠의 필드명을 설정할수 있는 Tokenizer 선언
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "name", "age", "address");
//LineMapper에 Tokenizer 세팅
lineMapper.setLineTokenizer(tokenizer);
//데이터를 필드 값에 맞추어 세팅
lineMapper.setFieldSetMapper(fieldSet -> {
int id = fieldSet.readInt("id");
String name = fieldSet.readString("name");
String age = fieldSet.readString("age");
String address = fieldSet.readString("address");
//읽은 데이터를 Person 객체로 전달
return new Person(id, name, age, address);
});
/** FlatFileItemReader 생성 **/
FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
.name("csvFileItemReader")
.encoding("UTF-8")
// Resource 디랙토리 아래 파일을 읽을수 있는 스프링 클래스
.resource(new ClassPathResource("test.csv"))
//첫번째 라인을 무시 (Parson의 필드명이기 떄문에)
.linesToSkip(1)
//위에서 설정한 LineMapper 적용
.lineMapper(lineMapper)
.build();
//ItemReader에 필요한 필수 설정값이 정상적으로 세팅 되었는지 검증
//Exception를 출력하기 때문에 throws Exception 추가
itemReader.afterPropertiesSet();
return itemReader;
}
// 생성한 csvFileItemReader을 실행시키기 위한 Step 세팅
@Bean
public Step csvFileStep() throws Exception {
return stepBuilderFactory.get("csvFileStep")
.<Person, Person>chunk(10)
.reader(this.csvFileItemReader())
.writer(itemWriter())
.build();
}
1.2 JdbcCursorItemReader
1.2.1 Cursor 개념
jtbc의 ResultSet은 Cursor를 구현한 클래스로 쿼리를 실행하기 위해서 Connection이 연결된 상태에서 Cursor의 위치를 한개씩 이동시키면서 데이터를 불러오게 되어 있다.
Cursor도 ItemSteam을 사용하여 데이터를 읽어오는데 open() -> update() -> close()의 순으로 데이터를 읽어오게 되며 모든 데이터를 읽었다는 기준은 update()가 null을 반환하는 것으로 되어 있다. 따라서 아래와 같은 특징을 갖고 있다.
- 배치 처리가 완료될 때 까지 DB Connection이 연결
- DB Connection 빈도가 낮아 성능이 좋은 반면, 긴 Connection 유지 시간 필요
- 하나의 Connection에서 처리되기 때문에, Thread Safe 하지 않음
- 모든 결과를 메모리에 할당하기 때문에, 더 많은 메모리를 사용
1.2.2 Paging 개념
Cursor와 반대로 Connection을 짧게 유지하면서 데이터를 조회 하는 paging 방식이 있다.
- 페이징 단위로 DB Connection을 연결
- DB Connection 빈도가 높아 비교적 성능이 낮은 반면, 짧은 Connection 유지 시간 필요
- 매번 Connection을 하기 때문에 Thread Safe
- 페이징 단위의 결과만 메모리에 할당하기 때문에, 비교적 더 적은 메모리를 사용
JdbcCursorItemReader는 DB기반 읽기 임으로 예제를 위한 sql 코드를 생성한다.
생성한 코드는 resource 폴더 아래에 parson.sql로 지정하였다.
create table person (
id bigint primary key auto_increment,
name varchar(255),
age varchar(255),
address varchar(255)
);
insert into person(name, age, address)
values('이경원','32','인천');
insert into person(name, age, address)
values('홍길동','30','서울');
insert into person(name, age, address)
values('아무개','25','강원');
테이블 구성이 완료되었으면 application.yml에 설정한 테이블을 읽을수 있도록 구성한다.
이때 데이터 베이스는 h2 DB를 사용한다. (다른 DB를 사용해도 무방)
spring:
batch:
job:
names: ${job.name:NONE}
initialize-schema:
datasource:
driver-class-name: org.h2.Driver
data: classpath:person.sql
생성된 데이터 베이스를 연결해야 하기 때문에 이전에 구현하였던 ItemReaderConfiguration에 DataSource 필드를 추가하고 생성자를 다시 만들어 준다.
@Configuration
@Slf4j
public class ItemReaderConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
//추가된 DataSource 클래스
private final DataSource dataSource;
public ItemReaderConfiguration(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
DataSource dataSource) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.dataSource = dataSource;
}
//이하 생략
}
생성자로 주입받은 dataSource를 활용하여 JdbcCursorItemReader를 구현한다.
커서 기반 JdbcItemReader에서는 sql 문을 활용하여 데이터를 조회한다.
private JdbcCursorItemReader<Person> jdbcCursorItemReader() throws Exception {
JdbcCursorItemReader<Person> itemReader = new JdbcCursorItemReaderBuilder<Person>()
.name("jdbcCursorItemReader")
//스프링 부트에서는 application.yml을 기반으로 dataSource를 자동으로 생성해 준다.
.dataSource(dataSource)
//데이터 조회 쿼리
.sql("select id, name, age, address from person")
//쿼리를 통하여 조회된 데이터를 Person 객체에 매핑 시켜 준다.
//람다 식으로 설정할수 있다.
//컬럼 인덱스는 0이 아니라 1부터 시작한다.
.rowMapper((rs, rowNum) -> new Person(
rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
이제 위에서 생성한 JdbcCursorItemReader를 실행시킬수 있는 Step을 설정해주고
Job에 추가한 Step을 넣어준다. (Job은 이전 예제를 이어 사용)
@Bean
public Step jdbcStep() throws Exception {
return stepBuilderFactory.get("jdbcStep")
.<Person, Person>chunk(10)
.reader(jdbcCursorItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job itemReaderJob() throws Exception {
return this.jobBuilderFactory.get("itemReaderJob")
.incrementer(new RunIdIncrementer())
.start(this.customItemReaderStep())
.next(this.csvFileStep())
//jdbcStep을 이어 실행하도록 추가
.next(this.jdbcStep())
.build();
}
1.3 JpaCursorItemReader
- 기존에는 Jpa는 Paging 기반의 ItemReader만 제공됨
- 스프링 4.3+ 에서 Jpa 기반 Cursor ItemReader가 제공됨
jpa는 EntityManager를 사용하여 데이터를 조회하기 때문에 기존 Person 객체를 Entity로 만들어 준다.
@Entity
@NoArgsConstructor
@Getter
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
private String name;
private String age;
private String address;
//ID를 받지 않는 생성자를 만듬
//JPA에서 자체적으로 ID를 생성해서 리턴해 준다.
public Person(String name, String age, String address) {
this(0, name, age, address);
}
public Person(int id, String name, String age, String address) {
this.id = id;
this.name = name;
this.age = age;
this.address = address;
}
}
jdbc와 마찬가지로 jpa를 사용하기 위해서 EntityManagerFactory를 받아와아 햔다.
스프링 부트의 기능을 활용하여 ItemReaderConfiguration에 생성자로 EntityManagerFactory를 받아오도록 설정한다.
@Configuration
@Slf4j
public class ItemReaderConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
//EntityManagerFactory를 Injection 시킨다.
private final EntityManagerFactory entityManagerFactory;
public ItemReaderConfiguration(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
DataSource dataSource,
EntityManagerFactory entityManagerFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.dataSource = dataSource;
this.entityManagerFactory = entityManagerFactory;
}
//이하 생략
}
위에서 추가한 entityManagerFactory를 사용하는 JpaCursorItemReader를 만든다.
특이한 점은 queryString에서 사용하는 쿼리가 JPQL 쿼리 라는 것이다.
private JpaCursorItemReader<Person> jpaCursorItemReader() throws Exception {
JpaCursorItemReader<Person> itemReader = new JpaCursorItemReaderBuilder<Person>()
.name("jpaCursorItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("select p from Person p")
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
위에서 생성한 JpaCursorItemReader를 실행시키기 위해 Step을 생성하고 기존에 생성한 itemReaderJob()에 Step을 추가한다.
@Bean
public Step jpaStep() throws Exception {
return stepBuilderFactory.get("jpaStep")
// Input을 Person 타입으로 Output을 Person으로 설정
.<Person, Person>chunk(10)
.reader(this.jpaCursorItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job itemReaderJob() throws Exception {
return this.jobBuilderFactory.get("itemReaderJob")
.incrementer(new RunIdIncrementer())
.start(this.customItemReaderStep())
.next(this.csvFileStep())
.next(this.jdbcStep())
//jpaStep을 추가하였다.
.next(this.jpaStep())
.build();
}
csvFileStep, jdbcStep, jpaStep 모두 같은 writer를 사용하도록 설정하였기 때문에 똑같은 결과가 3번 출력되는 것을 확인할 수 있다.