1. Kafka란?
Kafka란 분산 메시징 시스템으로, 대량의 데이터를 빠르고 안정적으로 처리할 수 있는 이벤트 스트리밍 플랫폼이다.
마이크로서비스 아키텍처에서 서비스 간의 비동기 통신을 위해 자주 사용된다.
특징
발행/구독 메시징 시스템
- 메시지를 Producer(생산자) 가 전송하면, Consumer(소비자) 가 구독(Subscribe)하여 가져가는 구조.
- 메시지를 토픽(Topic) 단위로 관리하여 여러 Consumer가 동시에 구독 가능.
고성능 & 확장성
- 분산 처리 기반이라 대량의 데이터를 빠르게 처리 가능.
- 여러 개의 브로커(Broker) 가 클러스터를 이루어 확장 가능.
내결함성
- 복제(Replication) 기능을 지원하여 데이터 손실 방지.
- 파티션(Partition) 을 사용하여 장애 발생 시 다른 브로커에서 처리 가능.
기본 용어
토픽(Topic)
- 메시지를 저장하는 논리적인 공간 (데이터베이스의 테이블과 유사).
- Producer는 특정 Topic에 메시지를 전송하고, Consumer는 Topic을 구독하여 메시지를 읽음.
파티션(Partition)
- 하나의 토픽을 여러 개의 파티션 으로 분할하여 데이터 저장.
- 같은 토픽이라도 여러 개의 브로커에서 병렬 처리 가능 → 성능 향상 & 확장성 증가.
브로커(Broker)
- Kafka 클러스터를 구성하는 서버.
- 여러 개의 브로커가 분산 환경에서 동작하며, 데이터 저장과 메시지 전달을 담당.
Producer(생산자)
- 메시지를 Kafka로 전송(Publish) 하는 역할.
- 특정 Topic으로 메시지를 보내고, Kafka는 이를 브로커 내의 파티션에 저장.
Consumer(소비자)
- Kafka에서 메시지를 구독(Subscribe)하여 가져가는 역할.
- Consumer Group 을 사용하면 여러 Consumer가 병렬로 메시지를 소비 가능.
Zookeeper
- Kafka 클러스터를 관리하는 역할.
- 브로커의 상태를 추적하고, 리더 선출(Leader Election) 등의 작업을 수행.
- Kafka 2.8부터 Zookeeper 없이 운영할 수 있는 KRaft(Kafka Raft) 도입!
2. Kafka 설치 및 실행
kafka & zookeeper는 VMware의 리눅스 가상머신 환경에서 실행
Spring Boot는 윈도우에서 실행
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
여기서 binary 파일의 주소를 복사해 wget으로 가상머신에 다운로드하고 압축을 풀어준다.
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar zxvf kafka_2.13-3.9.0.tgz
cd로 압축 푼 폴더의 bin으로 들어가서 아래 명령어를 실행한다. &를 붙이면 백그라운드에서 실행가능하다.
./zookeeper-server-start.sh ../config/zookeeper.properties &
다시 압축 푼 폴더의 config로 들어가서 vim으로 server.properties 편집해주자
vim server.properties
설정해줄 목록은 다음과 같다.
- listeners: Kafka 브로커가 수신할 주소와 포트 (Kafka 브로커가 실행될 IP와 포트)
- advertised.listeners: 외부에서 Kafka 브로커에 접속할 때 사용할 주소 (클라이언트가 이 주소를 알아야 접속 가능)
- zookeeper.connect: Kafka가 사용할 Zookeeper의 주소 (localhost:2181 등)
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://your.kafka.broker.ip:9092
zookeeper.connect=localhost:2181
다했으면 아래 명령어로 kafka도 실행시켜준다.
./kafka-server-start.sh ../config/server.properties &
그럼 기본적인 kafka 설정은 끝났다.
3. Spring Boot 프로젝트 설정
1) 의존성 추가
implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.2'
2) application.yml 설정
spring:
kafka:
bootstrap-servers: kafka서버ip:9092
consumer:
group-id: group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 만약 Json 객체를 보내고 싶다면 StringSerializer를 JsonSerializer로 바꿔주기
3) Kafka Producer 구성
package com.example.onlineheater.kafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 메시지 보내는 메서드
public void sendMessage(String topic,String message) {
kafkaTemplate.send(topic, message);
}
}
4) Kafka Consumer 구성
package com.example.onlineheater.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public KafkaConsumer(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@KafkaListener(topics = "my-topic", groupId = "test-group")
public void consumeMessage(String message) {
// Kafka에서 받은 메시지를 WebSocket을 통해 모든 구독자에게 전송
System.out.println(message);
}
}
5) Controller로 요청 받아서 메시지 뿌려보기
package com.example.onlineheater.controller;
import com.example.onlineheater.kafka.KafkaProducer;
import com.example.onlineheater.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
// Kafka에 메시지를 보내는 API
@PostMapping("/send-message")
public String sendMessage(@RequestBody Message message) {
kafkaProducer.sendMessage(String.valueOf(message.getId()),message.getMessage());
return "메시지가 Kafka에 전송되었습니다.";
}
}
4. Kafka 동작 흐름
1. 클라이언트의 요청
2. Spring Boot 에서 Producer를 통해 Kafka에 메시지 전송
- KafkaTemplate을 이용해 Producer가 Kafka 브로커에 메시지 전송
3. kafka 브로커가 메시지를 해당 토픽에 저장
- 메시지는 Kafka 클러스터 내의 브로커(Broker) 로 전송됨.
- 브로커는 메시지를 파티션(Partition) 에 저장하고 로그(commit log) 로 관리.
4. Consumer가 메시지를 구독하여 처리
- Consumer 는 특정 토픽을 구독(Subscribe)하고 메시지를 수신.
- 메시지를 비동기적으로 가져와서 비즈니스 로직 수행.
'Spring' 카테고리의 다른 글
OpenFeign으로 API 호출 자동화하기 (0) | 2025.02.27 |
---|---|
낙관적 락과 비관적 락 (0) | 2025.02.26 |
Spring Boot + Gradle 멀티 모듈 프로젝트 설정 (0) | 2025.02.18 |
Spring Boot에서 웹소켓(WebSocket) 사용하기 (0) | 2025.02.17 |
Spring Boot에서 Presigned URL을 사용한 S3 업로드 (0) | 2025.02.14 |