HEROJOON 블로그(히로블)

kafka error handler사용하기 본문

Kafka

kafka error handler사용하기

herojoon 2022. 4. 7. 02:18
반응형

환경

  • Framework : Spring Boot 2.6.6
  • Build : Gradle 6.5.1
  • JDK : JDK11
  • 로컬 환경에 kafka 미리 설치 및 실행하고 아래 코드 테스트 진행하였습니다.
 

windows에 kafka 설치하기

0. 요약 저의 설치 환경: Windows 11 1) windows 설치를 위한 kafka 다운로드 받습니다. 2) kafka 폴더에는 zookeeper, kafka 실행을 위한 bat형식의 배치파일과 zookeeper, kafka config파일이 들어 있습니다. 3)..

herojoon-dev.tistory.com

 

목표

consumer 오류 발생 시 error handler를 사용하여 처리하기

실패 메시지를 처리할 Dead-Topic을 생성하여 실패 메시지 처리될 수 있도록 예제를 작성해보았습니다. 

*여기서 말하는 Dead-Topic은 일반 토픽이지만 실패 메시지를 처리할 것이기 때문에 제가 토픽의 이름을 Dead-Topic이라고 지은 것입니다. kafka에서 자동으로 제공해주는 토픽은 아니예요.

 

 

해보기 요약

1. build.gradle에 kafka lib 추가

2. application.yaml에 kafka 연결 정보 추가

3. producer, comsumer, errorHandler 코드 작성

 

해보기

// build.gradle

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'  // kafka 추가
    testImplementation 'org.springframework.kafka:spring-kafka-test'  // kafka 추가
}

 

// application.yaml
// group-id 값이 필수 입력으로 변경되었기 때문에 꼭 입력해주셔야 합니다.

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      acks: all
    listener:
      ack-mode: MANUAL_IMMEDIATE
      type: SINGLE
    consumer:
      bootstrap-servers: localhost:9092
      group-id: dev-group

 

// producer 코드

package com.herojoon.kafkaproject.errorhandler.producer;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * Kafka Producer 테스트를 위한 Controller
 */
@RequestMapping("kafka/error")
@RestController
@RequiredArgsConstructor
public class ErrorTestProducer {
    // Spring application.yaml에 정의한 kafka 설정이 주입된 kafkaTemplate
    private final KafkaTemplate<String, String> kafkaTemplate;

    /**
     * Kafka Producer (Kafka ErrorHandler 테스트)
     * @return
     */
    @GetMapping("producer")
    public String sendMessageForErrorHandler() {
        String errorTopicName = "dev-topic2";
        String messageData = "kafka error test message";
        kafkaTemplate.send(errorTopicName, messageData);
        return "success.";
    }

    /**
     * Kafka Producer (Kafka ErrorHandler 테스트2)
     * @return
     */
    @GetMapping("producer2")
    public String sendMessageForErrorHandlerTwo() {
        String errorTopicName = "dev-topic3";
        String messageData = "kafka error test message two";
        kafkaTemplate.send(errorTopicName, messageData);
        return "success.";
    }
}

 

// consumer 코드

package com.herojoon.kafkaproject.errorhandler.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ErrorTestListener {

    /**
     * Kafka errorHandler를 이용하여 error처리하기 (방법1)
     *
     * <설명>
     * KafkaListenerErrorHandler를 구현해 놓은 KafkaErrorHandler.class의 이름을
     * @KafkaListener(errorHandler="이곳에 입력해줍니다.")
     * KafkaErrorHandler의 앞자리를 소문자로 입력해줍니다.
     *
     * <KafkaListenerErrorHandler 구현 위치>
     *  com.herojoon.kafkaproject.errorhandler.exception.KafkaErrorHandler
     *
     * @param record
     */
    @KafkaListener(topics = "dev-topic2", errorHandler = "kafkaErrorHandler")
    public void errorTestListener(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("### record: " + record.toString());
        log.info("### topic: " + record.topic() + ", value: " + record.value() + ", offset: " + record.offset());
        throw new KafkaException("무슨무슨 에러가 발생하였다..!!");  // KafkaListenerErrorHandler를 테스트하기 위해 에러를 발생시킵니다.
    }

    /**
     * Kafka errorHandler를 이용하여 error처리하기 (방법2)
     *
     * <설명>
     * KafkaListenerErrorHandler를 구현해 놓은 KafkaErrorHandlerTwo()의 이름을
     * @KafkaListener(errorHandler="이곳에 입력해줍니다.")
     * KafkaErrorHandlerTwo의 앞자리를 소문자로 입력해줍니다.
     *
     * <KafkaListenerErrorHandler 구현 위치>
     * com.herojoon.kafkaproject.errorhandler.config.kafkaConfig의 KafkaErrorHandlerTwo()
     *
     * @param record
     */
    @KafkaListener(topics = "dev-topic3", errorHandler = "kafkaErrorHandlerTwo")
    @SendTo("dev-dead-topic")
    public void errorTestListenerTow(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("### record: " + record.toString());
        log.info("### topic: " + record.topic() + ", value: " + record.value() + ", offset: " + record.offset());
        throw new KafkaException("무슨무슨 에러가 발생하였다..!! Two");  // KafkaListenerErrorHandler를 테스트하기 위해 에러를 발생시킵니다.
    }

    /**
     * 실패한 Kafka 메시지를 전달받아 처리하는 Consumer
     *
     * @param record
     * @param acknowledgment
     */
    @KafkaListener(topics = "dev-dead-topic")
    public void deadTopicConsumer(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("### [dead-topic] record: " + record.toString());
        log.info("### [dead-topic] topic: " + record.topic() + ", value: " + record.value() + ", offset: " + record.offset());

        // kafka 메시지 읽어온 곳까지 commit. (이 부분을 하지 않으면 메시지를 소비했다고 commit 된 것이 아니므로 계속 메시지를 읽어온다)
        acknowledgment.acknowledge();
    }
}

 

// error handler 코드 (방법1)

package com.herojoon.kafkaproject.errorhandler.exception;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaErrorHandler implements KafkaListenerErrorHandler {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private static String DEAD_TOPIC_NAME = "dev-dead-topic";

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    /**
     * kafka error 발생 시 처리
     *
     * <kafka error 발생 시 처리 방법>
     * 에러 발생 시 로그를 기록하거나 error처리를 위한 kafka topic으로 재전송하는 동작을 할 수 있습니다.
     *
     * @param message
     * @param exception
     * @param consumer
     * @return
     */
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
        /**
         * error 로그 기록
         */
        log.error("[KafkaErrorHandler] kafkaMessage=[" + message.getPayload() + "], errorMessage=[" + exception.getMessage() + "]");

        /**
         * 1) 원하는 메시지 값을 뽑아서 비교 처리나 조건 처리를 할 수도 있습니다.
         * 2) 혹은 원하는 내용만 error 로그로 기록할 수 있습니다.
         * 3) 혹은 실패 메시지 kafka topic으로 재전송 (실패 메시지를 처리할 dead topic을 별도로 생성해놓고 실패 메시지를 전송하여 처리하도록 합니다.)
         */
        ConsumerRecord<String, String> record = (ConsumerRecord<String, String>) message.getPayload();

        // 1) 원하는 메시지 값을 뽑아서 비교 처리나 조건 처리를 할 수도 있습니다.
        if (record.key() == "my key") {
            // 처리
        }
        // 2) 혹은 원하는 내용만 error 로그로 기록할 수 있습니다.
        log.error("[KafkaErrorHandler] topic=[" + record.topic() + "], value=[" + record.value() + "]");
        // 3) 혹은 실패 메시지 kafka topic으로 재전송 (실패 메시지를 처리할 dead topic을 별도로 생성해놓고 실패 메시지를 전송하여 처리하도록 합니다.)
        kafkaTemplate.send(DEAD_TOPIC_NAME, record.value());
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        offsets.put(
                new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1, null));
        consumer.commitSync(offsets); // offset commit. (메시지 처리한 곳 표시해줌.)

        return null;
    }
}

 

// error handler 코드 (방법2)

package com.herojoon.kafkaproject.errorhandler.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;

@Slf4j
@EnableKafka
@Configuration
public class KafkaConfig {

    /**
     * kafka error 발생 시 처리
     *
     * <kafka error 발생 시 처리 방법>
     * 에러 발생 시 로그를 기록하거나 error처리를 위한 kafka topic으로 재전송하는 동작을 할 수 있습니다.
     *
     * @return
     */
    @Bean
    public KafkaListenerErrorHandler kafkaErrorHandlerTwo() {
        return (m, e) -> {
            /**
             * error 로그 기록
             */
            log.error("[KafkaErrorHandler] kafkaMessage=[" + m.getPayload() + "], errorMessage=[" + e.getMessage() + "]");

            ConsumerRecord<String, String> record = (ConsumerRecord<String, String>) m.getPayload();
            // 메시지를 더 가공하거나 별도 처리를 하고..
            
            return record.value();  // sendTo("토픽명")에 입력된 토픽으로 전달 될 메시지 내용 
        };
    }
}
반응형

'Kafka' 카테고리의 다른 글

kafka 메시지 보내기  (0) 2022.04.06
windows에 kafka ui 설치하기  (2) 2022.03.26
windows에 kafka 설치하기  (0) 2022.03.26
Comments