티스토리 뷰

Backend

KafkaItemReader 적용기

2022. 8. 31. 14:45

KafkaItemReader 적용기

안녕하세요 VIP & Vertical 팀 김윤제입니다.

VIP 파트에서 상품 상세 페이지 및 리뷰 업무를 맡고 있습니다.

최근 리뷰 개편 작업에 Kafka 도입을 해보며 주니어 개발자로서 겪고 배운 내용을 공유드리기 위해 작성했습니다.

부족하게 쓴 내용일지라도 도움이 되었으면 좋겠습니다.

 


 

Kafka를 선택한 이유

제가 속한 VIP & Vertical팀 (VIP 파트)에서 Kafka를 사용하게 된 이유는 크게 다음과 같이 두 가지 이유가 있었습니다.

  1. RDB의 부하 조절을 위해 미들웨어인 메시징 플랫폼을 두어 데이터를 처리

  2. Review 등록 이후에 같은 데이터를 가지고 여러 가지 작업(Junk 판독, Review Summary) 등의 배치 작업을 하는
    병렬 프로세싱을 원했습니다. (데이터 파이프라인의 확장)

다른 메시징 플랫폼도 있었을텐데 ?

  • Kafka는 다른 메시징 플랫폼과는 달리 Pub/Sub 기반으로 메시지를 pull 하는 방식입니다.
    Consumer가 처리할 수 있을 때 메시지를 가져오므로 자원을 효율적으로 사용합니다.

  • 가장 큰 이유는 바로 ConsumerGroup의 offset 처리 기능 때문입니다.
    Kafka에서 ConsumerGroup은 바라보고 있는 Topic에 대한 Partition Offset관리를 별도로 하여
    여러 ConsumerGroup을 사용하여 동일한 Topic을 바라보더라도 같은 데이터를 수신할 수 있습니다.

KafkaItemReader를 선택하기까지의 과정

이번 작업에서의 요구사항은 Kafka Consumer를 통해 가져온 데이터들을 Spring Batch와 연동하여
일괄 처리하는 것이었습니다.

프로젝트 시작 당시에 Kafka에 대해 지식이 부족하여, Consumer를 통해 데이터를
List형식으로 받아오기만 하면 될 것 같았습니다.

어렵지 않게 Consumer를 통해 List 형식으로 받아왔지만 다음과 같은 문제가 발생하였습니다.

  • Spring Batch와 연동하여 데이터를 처리하려면 Reader 쪽에 Parameter로 넘겨줘야 하는데..
    파라미터에 리스트를 담을 수 있나..?

Spring Batch Job Parameter의 가능한 type 을 보니... Long, String, Double, Date가 있었습니다.

String을 보자 저는 어? 그럼 String 형식으로 ', '로 붙여서 보낼까...? 하는 잠시 어리석은 생각을 했었지만
데이터의 크기는 점점 커질 것 이고 String의 길이도 250 chars 제한이 있어 빠르게 포기하고 고민에 빠졌습니다.


Spring Batch를 굳이 써야 할까라는 의견도 있었지만 우리는 Spring Batch에서 제공하는 강력한 기능인
Metatable 관리 때문에 반드시 사용해야 했습니다.

 

이러한 고민을 같은 파트원께 공유드렸더니,
Spring Batch에서도 Kafka를 위한 ItemReader 같은 게 있을 거 같다고 도움을 주셨습니다.

바로 찾아보니 KafkaItemReader라는 것이 있었고, 최근에 나온 내용인지 관련된 내용이 많지 않았습니다.

 

그래서 준비했습니다. KafkaItemReader 아래에서 보시겠습니다.


KafkaItemReader In Spring Batch 사용 방법

@Bean
@StepScope
KafkaItemReader<ReviewRequestedEvent, ReviewRequestedEvent> junkItemReader() {
    Properties props = JobConfig.createProperty(kafkaProperties,ConsumerGroup.REVIEW_JOB);
    return new KafkaItemReaderBuilder<ReviewRequestedEvent, ReviewRequestedEvent>()
        .partitions(0,1,2)
        .partitionOffsets(new HashMap<>())
        .consumerProperties(props)
        .name("junkItemReader")
        .saveState(true)
        .pollTimeout(Duration.ofSeconds(6L))
        .topic(topic)
        .build();
}

KafkaItemReader를 지원하는 KafkaItemReaderBuilder를 이용하여 사용했으며 각각의 파라미터 타입들에 대해서는
공식 홈페이지에 아래와 같이 나와있습니다.

여기서 유심히 지켜봐야 할 내용은 consumerProperties, pollTimeout과 partitons, partitionOffsets입니다

ConsumerProperties 관련한 내용은 아래의 공식 홈페이지에서 확인할 수 있습니다.
https://kafka.apache.org/documentation/#consumerconfigs


KafkaItemReader 동작 과정

KafkaItemReader는 내부에 KafkaConsumer를 가지고 있습니다.

public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {

    private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets";

    private static final long DEFAULT_POLL_TIMEOUT = 30L;

    private List<TopicPartition> topicPartitions;

    private Map<TopicPartition, Long> partitionOffsets;

    private KafkaConsumer<K, V> kafkaConsumer;

    private Properties consumerProperties;

    private Iterator<ConsumerRecord<K, V>> consumerRecords;

    private Duration pollTimeout = Duration.ofSeconds(DEFAULT_POLL_TIMEOUT);

    private boolean saveState = true;


주요 동작 메서드는 open, update, read, close가 있습니다.

KafkaItemReader는 open()-> update()-> read() -> update() -> close() 순으로 동작하며

open 메서드의 경우 컨슈머 속성 설정 및 파티션 할당등의 작업이 이루어집니다.

@Override
public void open(ExecutionContext executionContext) {
    this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties);
    if (this.partitionOffsets == null) {
        this.partitionOffsets = new HashMap<>();
        for (TopicPartition topicPartition : this.topicPartitions) {
            this.partitionOffsets.put(topicPartition, 0L);
        }
    }
    if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
        Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
        }
    }
    this.kafkaConsumer.assign(this.topicPartitions);
    this.partitionOffsets.forEach(this.kafkaConsumer::seek);
}


update 메소드의 경우 KafkaConsumer의 commitSync를 통하여 offset을 커밋하고 있습니다.

@Override
public void update(ExecutionContext executionContext) {
    if (this.saveState) {
        executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets));
    }
    this.kafkaConsumer.commitSync();
}


read 메서드는 KafkaItemReader의 핵심 메서드로 KafkaConsumer를 통해 Topic에 저장된 데이터들을 가져옵니다.

KafkaConsumer는 poll 메서드를 호출하여 Topic에 저장된 데이터들을 레코드에 담아 가져옵니다

그 이후 레코드가 존재한다면 해당 레코드의 값을 반환합니다.

@Nullable
@Override
public V read() {
    if (this.consumerRecords == null || !this.consumerRecords.hasNext()) {
        this.consumerRecords = this.kafkaConsumer.poll(this.pollTimeout).iterator();
    }
    if (this.consumerRecords.hasNext()) {
        ConsumerRecord<K, V> record = this.consumerRecords.next();
        this.partitionOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        return record.value();
    }
    else {
        return null;
    }
}


close 메서드는 KafkaConsumer의 read -> update 이후에 호출되며 현재 실행된 kafkaCousumer를 종료합니다.

@Override
public void close() {
    if (this.kafkaConsumer != null) {
        this.kafkaConsumer.close();
    }
}

Trouble Shooting

KafkaItemReader를 사용하여 Kafka에 저장된 데이터들을 Consume 하여 배치 처리를 테스트하였습니다.

잘 처리가 되었으면 좋겠지만, 항상 난관은 존재하는 법이죠..

아래와 같이 여러 문제가 있었습니다.


첫 번째로는 Offset 관리가 안 되는 것이었습니다.

애플리케이션을 재실행할 경우에 ConsumerGroup이 바라보고 있는 Topic Partition의 Offset이 초기화가 되어서
제일 처음의 Offset부터 데이터를 가져오는 것입니다.

 

무엇이 문제였을까 열심히 찾다가 KafkaItemReader는 Builder 패턴을 적용한 KafkaItemReaderBuilder를 사용하고 있었는데 Spring Batch 4.36 버전 밑으로는 사용한 Offset을 기록하는 옵션(partitionOffsets)이 존재하지 않았던 문제였습니다.

  • Spring Batch 4.2.8 Version

 

두 번째 문제로는 바로 Default PollTimeout (30초)으로 인해 30초 동안은 계속해서 Kafka에 Poll()을 하여 데이터를 가져와서 배치작업을 하던 것입니다.

이에 적절한 fetch.min.bytes, max.poll.records, pollTimeout 설정이 필요했습니다.

 

세 번째 문제로는 KafkaIteamReader는 Partition 수에 따른 컨슈머 수 설정이 불가능 한 점이었습니다

KafkaIteamReader는 바라볼 partition에 대해 List로 인자를 받지만... 실제로 KafkaConsumer는 하나만 할당되어
메시지를 소비하는 것입니다.

(소비하는 KafkaConsumer가 한 개뿐이라면 partition을 여러 개 쓰는 의미가 없어집니다.)

 

아래 테스트 결과는 파티션 3개 설정 후 KafkaItemReader의 Consumer 동작 로그입니다.

동일한 client_id가 파티션 3개에 대해 접근하고 있습니다.

이 부분은 많은 메시지 처리가 필요하여 Partition 수에 따른 컨슈머 설정이 필요한 분께는 문제가 되는 부분입니다.

이에 대한 해결책으로 아래와 같은 두 가지를 생각해보았지만 결국 다른 선택을.. 하였습니다

  • 동일 ConsumerGroup에 partition 별로 KafkaItemReader를 사용하는 방안도 있지만
    코드가 상당히 지저분해질 것 같습니다.
    (추후의 처리량의 문제가 생긴다면 이 방안을 적용해야 할 것 같습니다.)

  • KafkaItemReader 자체를 Thread를 두어 실행하면 어떠할까
    • KafkaConsumer는 Thread Safe 하지 않기에 빠르게 포기하였습니다.

저의 경우는 많은 메시지의 처리가 아니었기 때문에 partition 하나에 KafkaConsumer를 한 개를 사용하여도 크게 문제가 되진 않을 것이라 판단되었습니다.

 

네 번째 문제로는 Batch Metatable의 실패 처리였습니다.

배치 실행 시에 실패한 데이터들에 대해서는 별도로 재배치작업을 하거나 관리자 페이지에서 수동으로
실행할 계획을 갖고 있었습니다

 

하지만 배치 작업이 실패했을 경우에 어떤 데이터 작업에서 실패한 지를 알 수가 없었습니다.

Metatable을 자세히 살펴보니 batch_job_execution 테이블의 STATUS, EXIT_CODE에 따른 EXIT_MESSAGE가
관리되고 있는데 이 메시지를 수정하여 사용하면 우리가 원하는 실패 케이스에 대한 관리를 할 수 있을 것으로
판단했습니다.

 

이에 아래와 같이 Job 실행 시에 JobExecutionListener를 두어 발생하는 에러 메시지를 가로챈 후 우리가 원하는 메시지로 변경하였고 batch_job_execution 테이블 EXIT_MESSAGE 컬럼에는 변경된 메시지가 저장되었습니다.

 

@Bean
Job junkJob() {
    return jobBuilderFactory.get(JUNK_JOB_NAME)
        .incrementer(new UniqueRunIdIncrementer())
        .listener(jobListenerConfig) //JobExecutionListener
        .start(junkJobStart())
        .build();
}

 

오류의 원본 메시지는 batch_step_execution에서 관리하였습니다.

 


마치며

여기까지 저의 글을 읽어 주셔서 감사합니다.

이 글을 쓰는 시점에도 Kafka의 지식이 부족하여 제가 미처 알지 못했던 부분도 있을 거라 생각합니다.

KafkaCousumer와 밀접한 내용이지만 내용이 방대해 다음 기회에 별도의 챕터로 블로깅 하겠습니다.

감사합니다.

 


Reference

고승범 & 공용준. 『카프카, 데이터 플랫폼의 최강자』. 책만, 2018.

https://docs.spring.io/spring-batch/docs/4.3.x/api/org/springframework/batch/item/kafka/builder/KafkaItemReaderBuilder.html

https://kafka.apache.org/documentation/

댓글