Spring Boot에서 Kafka 설정하기!

1. Kafka란?

Kafka란 분산 메시징 시스템으로, 대량의 데이터를 빠르고 안정적으로 처리할 수 있는 이벤트 스트리밍 플랫폼이다.

마이크로서비스 아키텍처에서 서비스 간의 비동기 통신을 위해 자주 사용된다.

 

특징

발행/구독 메시징 시스템

 

  • 메시지를 Producer(생산자) 가 전송하면, Consumer(소비자) 가 구독(Subscribe)하여 가져가는 구조.
  • 메시지를 토픽(Topic) 단위로 관리하여 여러 Consumer가 동시에 구독 가능.

고성능 & 확장성

 

  • 분산 처리 기반이라 대량의 데이터를 빠르게 처리 가능.
  • 여러 개의 브로커(Broker)클러스터를 이루어 확장 가능.

 

내결함성

 

  • 복제(Replication) 기능을 지원하여 데이터 손실 방지.
  • 파티션(Partition) 을 사용하여 장애 발생 시 다른 브로커에서 처리 가능.

 

기본 용어

토픽(Topic)

  • 메시지를 저장하는 논리적인 공간 (데이터베이스의 테이블과 유사).
  • Producer는 특정 Topic에 메시지를 전송하고, Consumer는 Topic을 구독하여 메시지를 읽음.

파티션(Partition)

  • 하나의 토픽을 여러 개의 파티션 으로 분할하여 데이터 저장.
  • 같은 토픽이라도 여러 개의 브로커에서 병렬 처리 가능 → 성능 향상 & 확장성 증가.

브로커(Broker)

  • Kafka 클러스터를 구성하는 서버.
  • 여러 개의 브로커가 분산 환경에서 동작하며, 데이터 저장과 메시지 전달을 담당.

Producer(생산자)

  • 메시지를 Kafka로 전송(Publish) 하는 역할.
  • 특정 Topic으로 메시지를 보내고, Kafka는 이를 브로커 내의 파티션에 저장.

Consumer(소비자)

  • Kafka에서 메시지를 구독(Subscribe)하여 가져가는 역할.
  • Consumer Group 을 사용하면 여러 Consumer가 병렬로 메시지를 소비 가능.

Zookeeper

  • Kafka 클러스터를 관리하는 역할.
  • 브로커의 상태를 추적하고, 리더 선출(Leader Election) 등의 작업을 수행.
  • Kafka 2.8부터 Zookeeper 없이 운영할 수 있는 KRaft(Kafka Raft) 도입!

 

 

 

2. Kafka 설치 및 실행

kafka & zookeeper는 VMware의 리눅스 가상머신 환경에서 실행

Spring Boot는 윈도우에서 실행

 

 

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

여기서 binary 파일의 주소를 복사해 wget으로 가상머신에 다운로드하고 압축을 풀어준다.

wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar zxvf kafka_2.13-3.9.0.tgz

 

cd로 압축 푼 폴더의 bin으로 들어가서 아래 명령어를 실행한다. &를 붙이면 백그라운드에서 실행가능하다.

./zookeeper-server-start.sh ../config/zookeeper.properties &

 

다시 압축 푼 폴더의 config로 들어가서 vim으로 server.properties 편집해주자

vim server.properties

 

설정해줄 목록은 다음과 같다.

 

  • listeners: Kafka 브로커가 수신할 주소와 포트 (Kafka 브로커가 실행될 IP와 포트)
  • advertised.listeners: 외부에서 Kafka 브로커에 접속할 때 사용할 주소 (클라이언트가 이 주소를 알아야 접속 가능)
  • zookeeper.connect: Kafka가 사용할 Zookeeper의 주소 (localhost:2181 등)
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://your.kafka.broker.ip:9092
zookeeper.connect=localhost:2181

 

 

 

다했으면 아래 명령어로 kafka도 실행시켜준다.

./kafka-server-start.sh ../config/server.properties &

 

그럼 기본적인 kafka 설정은 끝났다.

 

3. Spring Boot 프로젝트 설정

 

1) 의존성 추가

implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.2'

 

2) application.yml 설정

spring:
  kafka:
    bootstrap-servers: kafka서버ip:9092
    consumer:
      group-id: group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      
      # 만약 Json 객체를 보내고 싶다면 StringSerializer를 JsonSerializer로 바꿔주기

 

3) Kafka Producer 구성

package com.example.onlineheater.kafka;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@EnableKafka
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    // 메시지 보내는 메서드
    public void sendMessage(String topic,String message) {
        kafkaTemplate.send(topic, message);
    }
}

4) Kafka Consumer 구성

package com.example.onlineheater.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    private final SimpMessagingTemplate messagingTemplate;

    @Autowired
    public KafkaConsumer(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }

    @KafkaListener(topics = "my-topic", groupId = "test-group")
    public void consumeMessage(String message) {
        // Kafka에서 받은 메시지를 WebSocket을 통해 모든 구독자에게 전송
        System.out.println(message);
    }
}

 

5) Controller로 요청 받아서 메시지 뿌려보기

package com.example.onlineheater.controller;

import com.example.onlineheater.kafka.KafkaProducer;
import com.example.onlineheater.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    // Kafka에 메시지를 보내는 API
    @PostMapping("/send-message")
    public String sendMessage(@RequestBody Message message) {
        kafkaProducer.sendMessage(String.valueOf(message.getId()),message.getMessage());
        return "메시지가 Kafka에 전송되었습니다.";
    }
}

 

 

4. Kafka 동작 흐름

1. 클라이언트의 요청

 

2. Spring Boot 에서 Producer를 통해 Kafka에 메시지 전송

  • KafkaTemplate을 이용해 Producer가 Kafka 브로커에 메시지 전송

3. kafka 브로커가 메시지를 해당 토픽에 저장

 

  • 메시지는 Kafka 클러스터 내의 브로커(Broker) 로 전송됨.
  • 브로커는 메시지를 파티션(Partition) 에 저장하고 로그(commit log) 로 관리.

4. Consumer가 메시지를 구독하여 처리

 

  • Consumer 는 특정 토픽을 구독(Subscribe)하고 메시지를 수신.
  • 메시지를 비동기적으로 가져와서 비즈니스 로직 수행.