관리 메뉴

너와 나의 스토리

[Spring] 스프링 부트 배치 기본 지식과 설정: 용어 정리, Chunk 지향 프로세싱/Tasklet & Listener 설정 / Step 흐름 제어 본문

개발/Spring Boot

[Spring] 스프링 부트 배치 기본 지식과 설정: 용어 정리, Chunk 지향 프로세싱/Tasklet & Listener 설정 / Step 흐름 제어

노는게제일좋아! 2021. 1. 18. 13:57
반응형

Spring Boot Batch 설명

 

배치(Batch)란?

  • 프로그램의 흐름에 따라 순차적으로 자료를 처리한다는 뜻
  • 배치 처리 = 일괄 처리
  • ex) 휴면회원 전환 기능 -> 대용량 데이터를 일괄 처리

 

Job

  • 배치 처리 과정을 하나의 단위로 만들어 표현한 객체
  • 하나의 job 안에는 여러 step이 있다. 각 step을 배치의 기본 흐름대로 구현하다.
  • Job 객체를 만드는 빌더는 여러 개 있다. 여러 빌더를 통합 처리하는 공장인 JobBuilderFactory로 원하는 Job을 쉽게 만들 수 있다.
  • Job은 Step 또는 Flow 인스턴스의 컨테이너 역할을 하기 때문에 생성하기 전에 인스턴스를 전달받는다.

 

JobBuilderFactory 클래스

  • JobBuilderFactory 클래스의 get() 메서드로 JobBuilder를 생성할 수 있다.
    • JobBuilder get(String name);
  • 이렇게 생성된 JobBuilder로 Job을 생성한다.

 

JobBuilder의 메서드

  • SimpleJobBuilder start(Step step);
    • Step을 추가해서 가장 기본이 되는 SimpleJobBuilder를 생성
  • JobFlowBuilder start(Flow flow);
    • Flow를 실행할 JobFlowBuilder를 생성
  • JobFlowBuilder flow(Step step);
    • Step을 실행할 JobFlowBuilder를 생성

 

<Job 생성 예제>

  • InactiveUserConfig.java
  • 일괄적으로 휴면 처리하기 위한 Job
  • 해당 Job을 Bean으로 등록하는 configuration 코드
package com.example.batch.jobs;

import lombok.AllArgsConstructor;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.Job;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@AllArgsConstructor
@Configuration
public class InactiveUserConfig {

    @Bean
    public Job InactiveUserJob(JobBuilderFactory jobBuilderFactory, Step inactiveJobStep){
        return jobBuilderFactory.get("inactiveUserJob")
                .preventRestart()
                .start(inactiveJobStep)
                .build();
    }
}

 

JobInstance

  • JobInstance는 배치에서 Job이 실행될 때 하나의 Job 실행 단위이다.
  • 하루에 한 번씩 배치의 Job이 실행된다면, 각각의 Job을 JobInstance라고 부를 수 있다. 
  • 오늘 Job을 실행했는데 실패했다면 다음날 동일한 JobInstance를 가지고 다시 실행하게 된다.
    • Job 실행이 실패하면 JobInstance가 끝난 것으로 간주하지 않기 때문이다.
    • 그러면, 이 하나의 JobInstance는 어제 실패한 JobExecution과 오늘의 성공한 JobExecution 두 개를 가지게 된다.
    • 즉, 하나의 JobInstance는 여러 개의 JobExecution을 가지게 된다.

 

JobExecution

  • JobExecution은 JobInstance에 대한 한 번의 실행을 나타내는 객체이다.
  • JobExecution은 Job 실행에 대한 정보를 담고 있는 도메인 객체이다.
    • JobInstance, 배치 실행 상태, 시작 시간, 끝난 시간, 실패했을 때의 메시지 등의 정보를 담고 있다.

 

 

Step

  • Step은 실질적인 배치 처리를 정의하고 제어하는 데 필요한 모든 정보가 들어 있는 도메인 객체이다.
  • Job을 처리하는 실질적인 단위
  • 모든 Job은 1개 이상의 Step을 가진다.

 

StepExecution

  • Step 실행 정보를 담는 객체.
  • 각각의 Step이 실행될 때마다 StepExecution이 생성된다.

 

 

JobRepository

  • JobRepository는 배치 처리 정보를 담고 있는 메커니즘이다.
  • 어떤 Job이 실행되었으며 몇 번 실행되었고 언제 끝났는지 등 배치 처리에 대한 메타데이터를 저장한다.
  • 예: Job 하나가 실행되면 JobRepository에서는 배치 실행에 관련된 정보를 담고 있는 도메인인 JobExecution을 생성한다.
  • JobRepository는 StepExecution도 저장소에 저장하며 전체 메타데이터를 저장/관리하는 역할을 수행한다.

 

 

JobLauncher

  • JobLauncher는 Job, JobParameters와 함께 배치를 실행하는 인터페이스이다. 
  • run() 메서드 하나만 가짐.
    • JobExecution run(Job job, JobParameters jobParameters);

 

 

ItemReader

  • ItemReader는 Step의 대상이 되는 배치 데이터를 읽어오는 인터페이스이다.
  • FILE, XML, DB 등 여러 타입의 데이터를 읽어올 수 있다.
  • "ListItemReader<> 객체"를 사용하면 모든 데이터를 한 번에 가져와 메모리에 올려놓고 read() 메서드로 하나씩 배치 처리 작업을 수행할 수 있다.
  • 메서드:
    • T read();

 

ItemProcessor

  • ItemProcessor은 ItemReader로 읽어온 배치 데이터를 변환하는 역할을 수행한다.
  • 메서드:
    • 0 process(I item);

 

ItemWriter

  • ItemWriter는 배치 데이터를 저장한다. 
  • 일반적으로 DB나 파일에 저장한다.

 

 

Chunk 지향 프로세싱

  • Chunk oriented processing은 트랜잭션 경계 내에서 청크 단위로 데이터를 읽고 생성하는 프로그래밍 기법이다.
  • Chunk(청크): 아이템이 트랜잭션에서 커밋되는 수
    • read한 데이터 수가 지정한 청크 단위와 일치하면 write를 수행하고 트랜잭션을 커밋한다.
  • 청크 지향 프로세싱의 이점:
    • 1000여 개의 데이터에 대해 배치 로직을 실행한다고 했을 때, 청크로 나누지 않았을 때는 하나만 실패해도 다른 성공한 999개의 데이터가 롤백된다.
    • 그런데 청크 단위를 10개로 지정하면, 도중에 배치 처리에 실패하더라도 해당 10개의 데이터에 대해서만 롤백하면 된다.
  • 예제 코드:
package com.example.batch.jobs;


import com.example.batch.user.User;
import com.example.batch.user.enums.UserStatus;
import com.example.batch.user.repository.UserRepository;
import lombok.AllArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;

@AllArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final static int CHUNK_SIZE = 15;
    private final EntityManagerFactory entityManagerFactory;
    
    @Bean
    public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, Step inactiveJobStep){
        return jobBuilderFactory.get("inactiveUserJob")
                .preventRestart()
                .start(inactiveJobStep)
                .build();
    }

    @Bean
    public Step inactiveJobStep(StepBuilderFactory stepBuilderFactory, ListItemReader<User> inactiveUserReader){
        return stepBuilderFactory.get("inactiveUserStep")
                .<User, User> chunk(CHUNK_SIZE) // chunk 단위로 붂어서 writer() 메서드 실행
                .reader(inactiveUserReader)
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }

    @Bean
    @StepScope // step 주기에 따라 새로운 빈을 생성
    public ListItemReader<User> inactiveUserReader(@Value("#{jobParameters[nowDate]}") Date nowDate, UserRepository userRepository) {
        LocalDateTime now = LocalDateTime.ofInstant(nowDate.toInstant(), ZoneId.systemDefault());
        List<User> inactiveUsers = userRepository.findByUpdatedDateBeforeAndStatusEquals(now.minusYears(1), UserStatus.ACTIVE);
        return new ListItemReader<>(inactiveUsers);
    }

    public ItemProcessor<User, User> inactiveUserProcessor(){
        return User::setInactive;
    }

    private JpaItemWriter<User> inactiveUserWriter(){
        // JpaItemWriter는 별도로 저장 설정 필요 없음
        // 제네릭에 저장할 타입 명시하고 EntityManagerFactory만 설정하면 Processor에서 넘어온 데이터를 청크 단위로 저장함.
        JpaItemWriter<User> jpaItemWriter = new JpaItemWriter<>();
        jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
        return jpaItemWriter;
    }

}

 

 

Tasklet

  • 청크 지향 프로세싱이 아닌 방식
  • Tasklet은 임의의 step을 실행할 때 하나의 작업으로 처리하는 방식
  • Tasklet: 읽기, 처리, 쓰기를 단일 작업으로 만드는 개념
    • 청크 지향 프로세싱: 읽기, 처리, 쓰기로 나뉜 방식
  • 예제 코드:
package com.example.batch.jobs;

import com.example.batch.user.User;
import com.example.batch.user.enums.UserStatus;
import com.example.batch.user.repository.UserRepository;
import lombok.AllArgsConstructor;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

@Component
@AllArgsConstructor
public class InactiveItemTasklet implements Tasklet {

    private UserRepository userRepository;
    
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
        // reader
        Date nowDate = (Date) chunkContext.getStepContext().getJobParameters().get("nowDate");
        LocalDateTime now = LocalDateTime.ofInstant(nowDate.toInstant(), ZoneId.systemDefault());
        List<User> inactiveUsers = userRepository.findByUpdatedDateBeforeAndStatusEquals(now.minusYears(1), UserStatus.ACTIVE);
        
        // processor
        inactiveUsers = inactiveUsers.stream()
                .map(User::setInactive)
                .collect(Collectors.toList());
        
        // writer
        userRepository.saveAll(inactiveUsers);
                
        return RepeatStatus.FINISHED;
    }
}

 

 

Listener 설정

  • 배치 흐름에서 전후 처리를 하는 Listener를 설정할 수 있다.
  • Job의 전후 처리, Step의 전후 처리, 각 청크 단위에서의 전후 처리 등 세세한 과정 실행 시 특정 로직을 할당해 제어할 수 있다.
  • Listener 설정하는 방법은 두 가지이다.
    • JobExecutionListener 인터페이스 이용 VS 어노테이션 이용
    • 그 후, Listener를 Job 설정에 등록

 

 

1. Listener 설정: JobExecutionListener 인터페이스 이용

package com.example.batch.jobs.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class InactiveIJobListener implements JobExecutionListener {
    // 배치 흐름에서 전후 처리를 함
    // 인터페이스를 이용한 Listener

    @Override
    public void beforeJob(JobExecution jobExecution){
        log.info("Before Job");
    }

    public void afterJob(JobExecution jobExecution){
        log.info("After Job");
    }
}

 

 

2. Listener 설정: 어노테이션 사용

package com.example.batch.jobs.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class InactiveStepListener {
    // 어노테이션을 이용한 Listener

    @BeforeStep
    public void beforeStep(StepExecution stepExecution){
        log.info("Before Step");
    }

    @AfterStep
    public void afterStep(StepExecution stepExecution){
        log.info("After Step");
    }
}

 

 

Listener 설정: Job 설정에 Listener 등록

    @Bean
    public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, InactiveIJobListener inactiveIJobListener,  Step inactiveJobStep){
        return jobBuilderFactory.get("inactiveUserJob")
                .preventRestart()
                .listener(inactiveIJobListener)
                .start(inactiveJobStep)
                .build();
    }
    @Bean
    public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, InactiveStepListener inativeStepListener, Step inactiveJobStep){
        return jobBuilderFactory.get("inactiveUserJob")
                .preventRestart()
                .listener(inactiveStepListener)
                .start(inactiveJobStep)
                .build();
    }

 

 

 

Step의 흐름을 제어하는 Flow

  • [읽기-처리-쓰기]의 기본 Step 과정에 세부적인 조건을 추가할 수 있다.
  • 예: 조건에 따른 flow
    • 랜덤 정숫값을 생성해서
    • 양수이면 InactiveJobStep을 실행
    • 음수이면 동작 없음
  • 조건에 해당하는 부분을 JobExecutionDecider 인터페이스를 사용해 구현할 수 있다.

 

JobExecutionDecider 인터페이스

  • decide() 메서드 하나만 제공
    • 반환값: FlowExecutionStatus 객체
    • FlowExecutionStatus: 상탯값 COMPLETED, STOPPED, FAILED, UNKNOWN 등을 제공 
    • 이 상태값으로 Step 실행 여부를 판별하도록 설정할 수 있다.
package com.example.batch.jobs;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

import java.util.Random;

@Slf4j
public class InactiveJobExecutionDecider implements JobExecutionDecider {

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution){
        if(new Random().nextInt()>0){
            log.info("FlowExecutionStatus.COMPLETE");
            return FlowExecutionStatus.COMPLETED;
        }
        log.info("FlowExecutionStatus.FAILED");
        return FlowExecutionStatus.FAILED;
    }
}

 

// InactiveUserJobConfig.class

@Bean
public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, InactiveStepListener inactiveStepListener, Flow inactiveJobFlow){
  return jobBuilderFactory.get("inactiveUserJob")
    .preventRestart()
    .listener(inactiveStepListener)
    .start(inactiveJobFlow)
    .end()
    .build();
}

@Bean
public Flow inactiveJobFlow(Step inactiveJobStep){
  FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("inactiveJobFlow");
  return flowBuilder
    .start(new InactiveJobExecutionDecider())
    .on(FlowExecutionStatus.FAILED.getName()).end()
    .on(FlowExecutionStatus.COMPLETED.getName()).to(inactiveJobStep)
    .end();
}

 

 

 

 

출처:

- [처음 배우는 스프링 부트 2]

 

 

반응형
Comments