HEROJOON 블로그(히로블)

kafka 메시지 보내기 본문

Kafka

kafka 메시지 보내기

herojoon 2022. 4. 6. 00:36
반응형

환경

  • Framework : Spring Boot 2.6.6
  • Build : Gradle 6.5.1
  • JDK : JDK11
  • 로컬 환경에 kafka 미리 설치 및 실행하고 아래 코드 테스트 진행하였습니다.
 

windows에 kafka 설치하기

0. 요약 저의 설치 환경: Windows 11 1) windows 설치를 위한 kafka 다운로드 받습니다. 2) kafka 폴더에는 zookeeper, kafka 실행을 위한 bat형식의 배치파일과 zookeeper, kafka config파일이 들어 있습니다. 3)..

herojoon-dev.tistory.com

 

목표

kafka producer로 메시지를 보내고 consumer로 메시지 받아서 소비하기

 

해보기 요약

1. build.gradle에 kafka lib 추가

2. application.yaml에 kafka 연결 정보 추가

3. producer, comsumer 코드 작성

 

해보기

// build.gradle

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'  // kafka 추가
    testImplementation 'org.springframework.kafka:spring-kafka-test'  // kafka 추가
}

 

// application.yaml
// group-id 값이 필수 입력으로 변경되었기 때문에 꼭 입력해주셔야 합니다.

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      acks: all
    listener:
      ack-mode: MANUAL_IMMEDIATE
      type: SINGLE
    consumer:
      bootstrap-servers: localhost:9092
      group-id: dev-group

 

// producer 코드

package com.herojoon.kafkaproject.producer;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * Kafka Producer 테스트를 위한 Controller
 */
@RequestMapping("kafka")
@RestController
@RequiredArgsConstructor
public class MessageProducer {
    // Spring application.yaml에 정의한 kafka 설정이 주입된 kafkaTemplate
    private final KafkaTemplate<String, String> kafkaTemplate;

    private static String TOPIC_NAME = "dev-topic";

    /**
     * Kafka Producer
     * Kafka로 메시지를 전달하는 역할
     * @return
     */
    @GetMapping("producer")
    public String sendMessage() {
        String messageData = "kafka message";
        kafkaTemplate.send(TOPIC_NAME, messageData);
        return "success.";
    }
}

 

// consumer 코드

package com.herojoon.kafkaproject.sendmessage.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageListener {

    /**
     * Kafka Listener
     * Kafka에서 메시지를 읽어들이는 역할
     *
     * <개념>
     * Topic: 메시지 데이터의 구분을 할 수 있는 논리적 개념
     * Offset: Kafka Message의 고유번호. consumer에서 메시지를 어디까지 읽었는지 확인하는 용도로 쓰임
     *
     * <log 예시>
     * ### record: ConsumerRecord(topic = dev-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1649170434791, serialized key size = -1, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = kafka message)
     * ### topic: dev-topic, value: kafka message, offset: 1
     * @param record
     */
    @KafkaListener(topics = "dev-topic")
    public void messageListener(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("### record: " + record.toString());
        log.info("### topic: " + record.topic() + ", value: " + record.value() + ", offset: " + record.offset());

        // kafka 메시지 읽어온 곳까지 commit. (이 부분을 하지 않으면 메시지를 소비했다고 commit 된 것이 아니므로 계속 메시지를 읽어온다)
        acknowledgment.acknowledge();
    }
}
반응형

'Kafka' 카테고리의 다른 글

kafka error handler사용하기  (0) 2022.04.07
windows에 kafka ui 설치하기  (0) 2022.03.26
windows에 kafka 설치하기  (0) 2022.03.26
Comments