Recent Posts
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 겨울 부산
- taint
- 코루틴 빌더
- 달인막창
- table not found
- VARCHAR (1)
- 티스토리챌린지
- Spring Batch
- 개성국밥
- Value too long for column
- 헥사고날아키텍처 #육각형아키텍처 #유스케이스
- JanusWebRTCGateway
- k8s #kubernetes #쿠버네티스
- 자원부족
- preemption #
- python
- PersistenceContext
- mp4fpsmod
- JanusGateway
- 코루틴 컨텍스트
- tolerated
- 깡돼후
- 오블완
- JanusWebRTC
- terminal
- kotlin
- PytestPluginManager
- vfr video
- pytest
- JanusWebRTCServer
Archives
너와 나의 스토리
[Spring] 스프링 부트 배치 기본 지식과 설정: 용어 정리, Chunk 지향 프로세싱/Tasklet & Listener 설정 / Step 흐름 제어 본문
개발/Spring Boot
[Spring] 스프링 부트 배치 기본 지식과 설정: 용어 정리, Chunk 지향 프로세싱/Tasklet & Listener 설정 / Step 흐름 제어
노는게제일좋아! 2021. 1. 18. 13:57반응형
Spring Boot Batch 설명
배치(Batch)란?
- 프로그램의 흐름에 따라 순차적으로 자료를 처리한다는 뜻
- 배치 처리 = 일괄 처리
- ex) 휴면회원 전환 기능 -> 대용량 데이터를 일괄 처리
Job
- 배치 처리 과정을 하나의 단위로 만들어 표현한 객체
- 하나의 job 안에는 여러 step이 있다. 각 step을 배치의 기본 흐름대로 구현하다.
- Job 객체를 만드는 빌더는 여러 개 있다. 여러 빌더를 통합 처리하는 공장인 JobBuilderFactory로 원하는 Job을 쉽게 만들 수 있다.
- Job은 Step 또는 Flow 인스턴스의 컨테이너 역할을 하기 때문에 생성하기 전에 인스턴스를 전달받는다.
JobBuilderFactory 클래스
- JobBuilderFactory 클래스의 get() 메서드로 JobBuilder를 생성할 수 있다.
- JobBuilder get(String name);
- 이렇게 생성된 JobBuilder로 Job을 생성한다.
JobBuilder의 메서드
- SimpleJobBuilder start(Step step);
- Step을 추가해서 가장 기본이 되는 SimpleJobBuilder를 생성
- JobFlowBuilder start(Flow flow);
- Flow를 실행할 JobFlowBuilder를 생성
- JobFlowBuilder flow(Step step);
- Step을 실행할 JobFlowBuilder를 생성
<Job 생성 예제>
- InactiveUserConfig.java
- 일괄적으로 휴면 처리하기 위한 Job
- 해당 Job을 Bean으로 등록하는 configuration 코드
package com.example.batch.jobs;
import lombok.AllArgsConstructor;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.Job;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@AllArgsConstructor
@Configuration
public class InactiveUserConfig {
@Bean
public Job InactiveUserJob(JobBuilderFactory jobBuilderFactory, Step inactiveJobStep){
return jobBuilderFactory.get("inactiveUserJob")
.preventRestart()
.start(inactiveJobStep)
.build();
}
}
JobInstance
- JobInstance는 배치에서 Job이 실행될 때 하나의 Job 실행 단위이다.
- 하루에 한 번씩 배치의 Job이 실행된다면, 각각의 Job을 JobInstance라고 부를 수 있다.
- 오늘 Job을 실행했는데 실패했다면 다음날 동일한 JobInstance를 가지고 다시 실행하게 된다.
- Job 실행이 실패하면 JobInstance가 끝난 것으로 간주하지 않기 때문이다.
- 그러면, 이 하나의 JobInstance는 어제 실패한 JobExecution과 오늘의 성공한 JobExecution 두 개를 가지게 된다.
- 즉, 하나의 JobInstance는 여러 개의 JobExecution을 가지게 된다.
JobExecution
- JobExecution은 JobInstance에 대한 한 번의 실행을 나타내는 객체이다.
- JobExecution은 Job 실행에 대한 정보를 담고 있는 도메인 객체이다.
- JobInstance, 배치 실행 상태, 시작 시간, 끝난 시간, 실패했을 때의 메시지 등의 정보를 담고 있다.
Step
- Step은 실질적인 배치 처리를 정의하고 제어하는 데 필요한 모든 정보가 들어 있는 도메인 객체이다.
- Job을 처리하는 실질적인 단위
- 모든 Job은 1개 이상의 Step을 가진다.
StepExecution
- Step 실행 정보를 담는 객체.
- 각각의 Step이 실행될 때마다 StepExecution이 생성된다.
JobRepository
- JobRepository는 배치 처리 정보를 담고 있는 메커니즘이다.
- 어떤 Job이 실행되었으며 몇 번 실행되었고 언제 끝났는지 등 배치 처리에 대한 메타데이터를 저장한다.
- 예: Job 하나가 실행되면 JobRepository에서는 배치 실행에 관련된 정보를 담고 있는 도메인인 JobExecution을 생성한다.
- JobRepository는 StepExecution도 저장소에 저장하며 전체 메타데이터를 저장/관리하는 역할을 수행한다.
JobLauncher
- JobLauncher는 Job, JobParameters와 함께 배치를 실행하는 인터페이스이다.
- run() 메서드 하나만 가짐.
- JobExecution run(Job job, JobParameters jobParameters);
ItemReader
- ItemReader는 Step의 대상이 되는 배치 데이터를 읽어오는 인터페이스이다.
- FILE, XML, DB 등 여러 타입의 데이터를 읽어올 수 있다.
- "ListItemReader<> 객체"를 사용하면 모든 데이터를 한 번에 가져와 메모리에 올려놓고 read() 메서드로 하나씩 배치 처리 작업을 수행할 수 있다.
- 메서드:
- T read();
ItemProcessor
- ItemProcessor은 ItemReader로 읽어온 배치 데이터를 변환하는 역할을 수행한다.
- 메서드:
- 0 process(I item);
ItemWriter
- ItemWriter는 배치 데이터를 저장한다.
- 일반적으로 DB나 파일에 저장한다.
Chunk 지향 프로세싱
- Chunk oriented processing은 트랜잭션 경계 내에서 청크 단위로 데이터를 읽고 생성하는 프로그래밍 기법이다.
- Chunk(청크): 아이템이 트랜잭션에서 커밋되는 수
- read한 데이터 수가 지정한 청크 단위와 일치하면 write를 수행하고 트랜잭션을 커밋한다.
- 청크 지향 프로세싱의 이점:
- 1000여 개의 데이터에 대해 배치 로직을 실행한다고 했을 때, 청크로 나누지 않았을 때는 하나만 실패해도 다른 성공한 999개의 데이터가 롤백된다.
- 그런데 청크 단위를 10개로 지정하면, 도중에 배치 처리에 실패하더라도 해당 10개의 데이터에 대해서만 롤백하면 된다.
- 예제 코드:
package com.example.batch.jobs;
import com.example.batch.user.User;
import com.example.batch.user.enums.UserStatus;
import com.example.batch.user.repository.UserRepository;
import lombok.AllArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.persistence.EntityManagerFactory;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
@AllArgsConstructor
@Configuration
public class InactiveUserJobConfig {
private final static int CHUNK_SIZE = 15;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, Step inactiveJobStep){
return jobBuilderFactory.get("inactiveUserJob")
.preventRestart()
.start(inactiveJobStep)
.build();
}
@Bean
public Step inactiveJobStep(StepBuilderFactory stepBuilderFactory, ListItemReader<User> inactiveUserReader){
return stepBuilderFactory.get("inactiveUserStep")
.<User, User> chunk(CHUNK_SIZE) // chunk 단위로 붂어서 writer() 메서드 실행
.reader(inactiveUserReader)
.processor(inactiveUserProcessor())
.writer(inactiveUserWriter())
.build();
}
@Bean
@StepScope // step 주기에 따라 새로운 빈을 생성
public ListItemReader<User> inactiveUserReader(@Value("#{jobParameters[nowDate]}") Date nowDate, UserRepository userRepository) {
LocalDateTime now = LocalDateTime.ofInstant(nowDate.toInstant(), ZoneId.systemDefault());
List<User> inactiveUsers = userRepository.findByUpdatedDateBeforeAndStatusEquals(now.minusYears(1), UserStatus.ACTIVE);
return new ListItemReader<>(inactiveUsers);
}
public ItemProcessor<User, User> inactiveUserProcessor(){
return User::setInactive;
}
private JpaItemWriter<User> inactiveUserWriter(){
// JpaItemWriter는 별도로 저장 설정 필요 없음
// 제네릭에 저장할 타입 명시하고 EntityManagerFactory만 설정하면 Processor에서 넘어온 데이터를 청크 단위로 저장함.
JpaItemWriter<User> jpaItemWriter = new JpaItemWriter<>();
jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
return jpaItemWriter;
}
}
Tasklet
- 청크 지향 프로세싱이 아닌 방식
- Tasklet은 임의의 step을 실행할 때 하나의 작업으로 처리하는 방식
- Tasklet: 읽기, 처리, 쓰기를 단일 작업으로 만드는 개념
- 청크 지향 프로세싱: 읽기, 처리, 쓰기로 나뉜 방식
- 예제 코드:
package com.example.batch.jobs;
import com.example.batch.user.User;
import com.example.batch.user.enums.UserStatus;
import com.example.batch.user.repository.UserRepository;
import lombok.AllArgsConstructor;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@Component
@AllArgsConstructor
public class InactiveItemTasklet implements Tasklet {
private UserRepository userRepository;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// reader
Date nowDate = (Date) chunkContext.getStepContext().getJobParameters().get("nowDate");
LocalDateTime now = LocalDateTime.ofInstant(nowDate.toInstant(), ZoneId.systemDefault());
List<User> inactiveUsers = userRepository.findByUpdatedDateBeforeAndStatusEquals(now.minusYears(1), UserStatus.ACTIVE);
// processor
inactiveUsers = inactiveUsers.stream()
.map(User::setInactive)
.collect(Collectors.toList());
// writer
userRepository.saveAll(inactiveUsers);
return RepeatStatus.FINISHED;
}
}
Listener 설정
- 배치 흐름에서 전후 처리를 하는 Listener를 설정할 수 있다.
- Job의 전후 처리, Step의 전후 처리, 각 청크 단위에서의 전후 처리 등 세세한 과정 실행 시 특정 로직을 할당해 제어할 수 있다.
- Listener 설정하는 방법은 두 가지이다.
- JobExecutionListener 인터페이스 이용 VS 어노테이션 이용
- 그 후, Listener를 Job 설정에 등록
1. Listener 설정: JobExecutionListener 인터페이스 이용
package com.example.batch.jobs.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class InactiveIJobListener implements JobExecutionListener {
// 배치 흐름에서 전후 처리를 함
// 인터페이스를 이용한 Listener
@Override
public void beforeJob(JobExecution jobExecution){
log.info("Before Job");
}
public void afterJob(JobExecution jobExecution){
log.info("After Job");
}
}
2. Listener 설정: 어노테이션 사용
package com.example.batch.jobs.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class InactiveStepListener {
// 어노테이션을 이용한 Listener
@BeforeStep
public void beforeStep(StepExecution stepExecution){
log.info("Before Step");
}
@AfterStep
public void afterStep(StepExecution stepExecution){
log.info("After Step");
}
}
Listener 설정: Job 설정에 Listener 등록
@Bean
public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, InactiveIJobListener inactiveIJobListener, Step inactiveJobStep){
return jobBuilderFactory.get("inactiveUserJob")
.preventRestart()
.listener(inactiveIJobListener)
.start(inactiveJobStep)
.build();
}
@Bean
public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, InactiveStepListener inativeStepListener, Step inactiveJobStep){
return jobBuilderFactory.get("inactiveUserJob")
.preventRestart()
.listener(inactiveStepListener)
.start(inactiveJobStep)
.build();
}
Step의 흐름을 제어하는 Flow
- [읽기-처리-쓰기]의 기본 Step 과정에 세부적인 조건을 추가할 수 있다.
- 예: 조건에 따른 flow
- 랜덤 정숫값을 생성해서
- 양수이면 InactiveJobStep을 실행
- 음수이면 동작 없음
- 조건에 해당하는 부분을 JobExecutionDecider 인터페이스를 사용해 구현할 수 있다.
JobExecutionDecider 인터페이스
- decide() 메서드 하나만 제공
- 반환값: FlowExecutionStatus 객체
- FlowExecutionStatus: 상탯값 COMPLETED, STOPPED, FAILED, UNKNOWN 등을 제공
- 이 상태값으로 Step 실행 여부를 판별하도록 설정할 수 있다.
package com.example.batch.jobs;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import java.util.Random;
@Slf4j
public class InactiveJobExecutionDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution){
if(new Random().nextInt()>0){
log.info("FlowExecutionStatus.COMPLETE");
return FlowExecutionStatus.COMPLETED;
}
log.info("FlowExecutionStatus.FAILED");
return FlowExecutionStatus.FAILED;
}
}
// InactiveUserJobConfig.class
@Bean
public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, InactiveStepListener inactiveStepListener, Flow inactiveJobFlow){
return jobBuilderFactory.get("inactiveUserJob")
.preventRestart()
.listener(inactiveStepListener)
.start(inactiveJobFlow)
.end()
.build();
}
@Bean
public Flow inactiveJobFlow(Step inactiveJobStep){
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("inactiveJobFlow");
return flowBuilder
.start(new InactiveJobExecutionDecider())
.on(FlowExecutionStatus.FAILED.getName()).end()
.on(FlowExecutionStatus.COMPLETED.getName()).to(inactiveJobStep)
.end();
}
출처:
- [처음 배우는 스프링 부트 2]
반응형
'개발 > Spring Boot' 카테고리의 다른 글
Comments