Recent Posts
Recent Comments
HEROJOON 블로그(히로블)
kafka error handler사용하기 본문
반응형
환경
- Framework : Spring Boot 2.6.6
- Build : Gradle 6.5.1
- JDK : JDK11
- 로컬 환경에 kafka 미리 설치 및 실행하고 아래 코드 테스트 진행하였습니다.
- kafka 설치 참고: https://herojoon-dev.tistory.com/118
목표
consumer 오류 발생 시 error handler를 사용하여 처리하기
실패 메시지를 처리할 Dead-Topic을 생성하여 실패 메시지 처리될 수 있도록 예제를 작성해보았습니다.
*여기서 말하는 Dead-Topic은 일반 토픽이지만 실패 메시지를 처리할 것이기 때문에 제가 토픽의 이름을 Dead-Topic이라고 지은 것입니다. kafka에서 자동으로 제공해주는 토픽은 아니예요.
해보기 요약
1. build.gradle에 kafka lib 추가
2. application.yaml에 kafka 연결 정보 추가
3. producer, comsumer, errorHandler 코드 작성
해보기
- 전체 예제 코드: https://github.com/herojoon/kafka-project 의 errorhandler package 코드 참고.
// build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka' // kafka 추가
testImplementation 'org.springframework.kafka:spring-kafka-test' // kafka 추가
}
// application.yaml
// group-id 값이 필수 입력으로 변경되었기 때문에 꼭 입력해주셔야 합니다.
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: all
listener:
ack-mode: MANUAL_IMMEDIATE
type: SINGLE
consumer:
bootstrap-servers: localhost:9092
group-id: dev-group
// producer 코드
package com.herojoon.kafkaproject.errorhandler.producer;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Kafka Producer 테스트를 위한 Controller
*/
@RequestMapping("kafka/error")
@RestController
@RequiredArgsConstructor
public class ErrorTestProducer {
// Spring application.yaml에 정의한 kafka 설정이 주입된 kafkaTemplate
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* Kafka Producer (Kafka ErrorHandler 테스트)
* @return
*/
@GetMapping("producer")
public String sendMessageForErrorHandler() {
String errorTopicName = "dev-topic2";
String messageData = "kafka error test message";
kafkaTemplate.send(errorTopicName, messageData);
return "success.";
}
/**
* Kafka Producer (Kafka ErrorHandler 테스트2)
* @return
*/
@GetMapping("producer2")
public String sendMessageForErrorHandlerTwo() {
String errorTopicName = "dev-topic3";
String messageData = "kafka error test message two";
kafkaTemplate.send(errorTopicName, messageData);
return "success.";
}
}
// consumer 코드
package com.herojoon.kafkaproject.errorhandler.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ErrorTestListener {
/**
* Kafka errorHandler를 이용하여 error처리하기 (방법1)
*
* <설명>
* KafkaListenerErrorHandler를 구현해 놓은 KafkaErrorHandler.class의 이름을
* @KafkaListener(errorHandler="이곳에 입력해줍니다.")
* KafkaErrorHandler의 앞자리를 소문자로 입력해줍니다.
*
* <KafkaListenerErrorHandler 구현 위치>
* com.herojoon.kafkaproject.errorhandler.exception.KafkaErrorHandler
*
* @param record
*/
@KafkaListener(topics = "dev-topic2", errorHandler = "kafkaErrorHandler")
public void errorTestListener(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("### record: " + record.toString());
log.info("### topic: " + record.topic() + ", value: " + record.value() + ", offset: " + record.offset());
throw new KafkaException("무슨무슨 에러가 발생하였다..!!"); // KafkaListenerErrorHandler를 테스트하기 위해 에러를 발생시킵니다.
}
/**
* Kafka errorHandler를 이용하여 error처리하기 (방법2)
*
* <설명>
* KafkaListenerErrorHandler를 구현해 놓은 KafkaErrorHandlerTwo()의 이름을
* @KafkaListener(errorHandler="이곳에 입력해줍니다.")
* KafkaErrorHandlerTwo의 앞자리를 소문자로 입력해줍니다.
*
* <KafkaListenerErrorHandler 구현 위치>
* com.herojoon.kafkaproject.errorhandler.config.kafkaConfig의 KafkaErrorHandlerTwo()
*
* @param record
*/
@KafkaListener(topics = "dev-topic3", errorHandler = "kafkaErrorHandlerTwo")
@SendTo("dev-dead-topic")
public void errorTestListenerTow(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("### record: " + record.toString());
log.info("### topic: " + record.topic() + ", value: " + record.value() + ", offset: " + record.offset());
throw new KafkaException("무슨무슨 에러가 발생하였다..!! Two"); // KafkaListenerErrorHandler를 테스트하기 위해 에러를 발생시킵니다.
}
/**
* 실패한 Kafka 메시지를 전달받아 처리하는 Consumer
*
* @param record
* @param acknowledgment
*/
@KafkaListener(topics = "dev-dead-topic")
public void deadTopicConsumer(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("### [dead-topic] record: " + record.toString());
log.info("### [dead-topic] topic: " + record.topic() + ", value: " + record.value() + ", offset: " + record.offset());
// kafka 메시지 읽어온 곳까지 commit. (이 부분을 하지 않으면 메시지를 소비했다고 commit 된 것이 아니므로 계속 메시지를 읽어온다)
acknowledgment.acknowledge();
}
}
// error handler 코드 (방법1)
package com.herojoon.kafkaproject.errorhandler.exception;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaErrorHandler implements KafkaListenerErrorHandler {
private final KafkaTemplate<String, String> kafkaTemplate;
private static String DEAD_TOPIC_NAME = "dev-dead-topic";
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
return null;
}
/**
* kafka error 발생 시 처리
*
* <kafka error 발생 시 처리 방법>
* 에러 발생 시 로그를 기록하거나 error처리를 위한 kafka topic으로 재전송하는 동작을 할 수 있습니다.
*
* @param message
* @param exception
* @param consumer
* @return
*/
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
/**
* error 로그 기록
*/
log.error("[KafkaErrorHandler] kafkaMessage=[" + message.getPayload() + "], errorMessage=[" + exception.getMessage() + "]");
/**
* 1) 원하는 메시지 값을 뽑아서 비교 처리나 조건 처리를 할 수도 있습니다.
* 2) 혹은 원하는 내용만 error 로그로 기록할 수 있습니다.
* 3) 혹은 실패 메시지 kafka topic으로 재전송 (실패 메시지를 처리할 dead topic을 별도로 생성해놓고 실패 메시지를 전송하여 처리하도록 합니다.)
*/
ConsumerRecord<String, String> record = (ConsumerRecord<String, String>) message.getPayload();
// 1) 원하는 메시지 값을 뽑아서 비교 처리나 조건 처리를 할 수도 있습니다.
if (record.key() == "my key") {
// 처리
}
// 2) 혹은 원하는 내용만 error 로그로 기록할 수 있습니다.
log.error("[KafkaErrorHandler] topic=[" + record.topic() + "], value=[" + record.value() + "]");
// 3) 혹은 실패 메시지 kafka topic으로 재전송 (실패 메시지를 처리할 dead topic을 별도로 생성해놓고 실패 메시지를 전송하여 처리하도록 합니다.)
kafkaTemplate.send(DEAD_TOPIC_NAME, record.value());
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));
consumer.commitSync(offsets); // offset commit. (메시지 처리한 곳 표시해줌.)
return null;
}
}
// error handler 코드 (방법2)
package com.herojoon.kafkaproject.errorhandler.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
@Slf4j
@EnableKafka
@Configuration
public class KafkaConfig {
/**
* kafka error 발생 시 처리
*
* <kafka error 발생 시 처리 방법>
* 에러 발생 시 로그를 기록하거나 error처리를 위한 kafka topic으로 재전송하는 동작을 할 수 있습니다.
*
* @return
*/
@Bean
public KafkaListenerErrorHandler kafkaErrorHandlerTwo() {
return (m, e) -> {
/**
* error 로그 기록
*/
log.error("[KafkaErrorHandler] kafkaMessage=[" + m.getPayload() + "], errorMessage=[" + e.getMessage() + "]");
ConsumerRecord<String, String> record = (ConsumerRecord<String, String>) m.getPayload();
// 메시지를 더 가공하거나 별도 처리를 하고..
return record.value(); // sendTo("토픽명")에 입력된 토픽으로 전달 될 메시지 내용
};
}
}
반응형
'Kafka' 카테고리의 다른 글
kafka 메시지 보내기 (0) | 2022.04.06 |
---|---|
windows에 kafka ui 설치하기 (2) | 2022.03.26 |
windows에 kafka 설치하기 (0) | 2022.03.26 |
Comments