Recent Posts
Recent Comments
HEROJOON 블로그(히로블)
kafka 메시지 보내기 본문
반응형
환경
- Framework : Spring Boot 2.6.6
- Build : Gradle 6.5.1
- JDK : JDK11
- 로컬 환경에 kafka 미리 설치 및 실행하고 아래 코드 테스트 진행하였습니다.
- kafka 설치 참고: https://herojoon-dev.tistory.com/118
목표
kafka producer로 메시지를 보내고 consumer로 메시지 받아서 소비하기
해보기 요약
1. build.gradle에 kafka lib 추가
2. application.yaml에 kafka 연결 정보 추가
3. producer, comsumer 코드 작성
해보기
- 전체 예제 코드: https://github.com/herojoon/kafka-project 의 sendmassage package 코드 참고.
// build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka' // kafka 추가
testImplementation 'org.springframework.kafka:spring-kafka-test' // kafka 추가
}
// application.yaml
// group-id 값이 필수 입력으로 변경되었기 때문에 꼭 입력해주셔야 합니다.
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: all
listener:
ack-mode: MANUAL_IMMEDIATE
type: SINGLE
consumer:
bootstrap-servers: localhost:9092
group-id: dev-group
// producer 코드
package com.herojoon.kafkaproject.producer;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Kafka Producer 테스트를 위한 Controller
*/
@RequestMapping("kafka")
@RestController
@RequiredArgsConstructor
public class MessageProducer {
// Spring application.yaml에 정의한 kafka 설정이 주입된 kafkaTemplate
private final KafkaTemplate<String, String> kafkaTemplate;
private static String TOPIC_NAME = "dev-topic";
/**
* Kafka Producer
* Kafka로 메시지를 전달하는 역할
* @return
*/
@GetMapping("producer")
public String sendMessage() {
String messageData = "kafka message";
kafkaTemplate.send(TOPIC_NAME, messageData);
return "success.";
}
}
// consumer 코드
package com.herojoon.kafkaproject.sendmessage.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageListener {
/**
* Kafka Listener
* Kafka에서 메시지를 읽어들이는 역할
*
* <개념>
* Topic: 메시지 데이터의 구분을 할 수 있는 논리적 개념
* Offset: Kafka Message의 고유번호. consumer에서 메시지를 어디까지 읽었는지 확인하는 용도로 쓰임
*
* <log 예시>
* ### record: ConsumerRecord(topic = dev-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1649170434791, serialized key size = -1, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = kafka message)
* ### topic: dev-topic, value: kafka message, offset: 1
* @param record
*/
@KafkaListener(topics = "dev-topic")
public void messageListener(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("### record: " + record.toString());
log.info("### topic: " + record.topic() + ", value: " + record.value() + ", offset: " + record.offset());
// kafka 메시지 읽어온 곳까지 commit. (이 부분을 하지 않으면 메시지를 소비했다고 commit 된 것이 아니므로 계속 메시지를 읽어온다)
acknowledgment.acknowledge();
}
}
반응형
'Kafka' 카테고리의 다른 글
kafka error handler사용하기 (0) | 2022.04.07 |
---|---|
windows에 kafka ui 설치하기 (2) | 2022.03.26 |
windows에 kafka 설치하기 (0) | 2022.03.26 |
Comments