도커에 카프카 설치, 스프링에 연동해보자
도커 데스크탑 설치가 되어 있어야 함
1. docker-compose.yml 생성 (아무데나 해도 되지만 프로젝트 or 도커 작업 경로에 두는게 좋음)
[Kafka Cluster]
└── Broker 1 (단일 브로커)
└── Topic: test-topic
└── Partition 0 (파티션 1개)
// docker-compose.yml
version: '3'
services:
kafka:
image: confluentinc/cp-kafka:7.5.1
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:19092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: 'LlP5PCzPR4iuUNXJNOXo7Q'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
volumes:
- kafka_data:/var/lib/kafka/data
volumes:
kafka_data:
2. yml 파일이 있는 위치에서 powershell로 docker compose up -d 실행
도커 테스크탑에서 실행중인 카프카 확인
3. Spring Boot 3 설정
spring:
kafka:
# Kafka 브로커 서버 주소
bootstrap-servers: localhost:9092
consumer:
group-id: my-group # 컨슈머 그룹 ID (같은 그룹은 메시지를 분산 처리)
# 새로운 컨슈머 그룹이 생성될 때 어디서부터 메시지를 읽을지 설정
# earliest: 가장 처음부터 / latest: 가장 최근부터
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
enable.auto.commit: true
auto.commit.interval.ms: 100 # 오프셋 커밋 간격 (ms)
session.timeout.ms: 45000 # 세션 타임아웃 시간 (ms)
allow.auto.create.topics: true # 토픽이 없을 경우 자동 생성 허용
partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor # 파티션 할당 전략 (RoundRobin: 균등 분배)
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
logging:
level:
org.apache.kafka: INFO
org.springframework.kafka: INFO
## 패키지 이름
com.blog: DEBUG
4. 카프카 코드 작성
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
/**
* 애플리케이션 시작 시 자동으로 토픽을 생성하는 Bean
* @return 새로 생성된 토픽 정보
*/
@Bean
public NewTopic testTopic() {
return TopicBuilder.name("test-topic")
.partitions(1) // 파티션 수
.replicas(1) // 복제 팩터 수
.build();
}
}
@Slf4j
@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducer producer;
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody String message) {
log.info("=== 컨트롤러에서 메시지 수신: {} ===", message);
producer.sendMessage("test-topic", message);
return ResponseEntity.ok("메시지 발행 요청 완료");
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
log.info("=== 프로듀서 메시지 발행 시작 ===");
log.info("토픽: {}", topic);
log.info("메시지: {}", message);
kafkaTemplate.send(topic, message)
.thenAccept(result -> {
log.info("=== 메시지 발행 성공 ===");
log.info("토픽: {}", result.getRecordMetadata().topic());
log.info("파티션: {}", result.getRecordMetadata().partition());
log.info("오프셋: {}", result.getRecordMetadata().offset());
log.info("타임스탬프: {}", result.getRecordMetadata().timestamp());
})
.exceptionally(e -> {
log.error("=== 메시지 발행 실패 ===", e);
return null;
});
}
}
@Slf4j
@Service
@EnableKafka
public class KafkaConsumer {
@KafkaListener(
topics = "test-topic",
groupId = "my-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void listen(
String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
try {
log.info("=== 컨슈머 메시지 수신 ===");
log.info("토픽: {}", topic);
log.info("파티션: {}", partition);
log.info("오프셋: {}", offset);
log.info("메시지: {}", message);
// 메시지 처리 로직
log.info("=== 메시지 처리 완료 ===");
} catch (Exception e) {
log.error("메시지 처리 중 에러 발생", e);
throw e;
}
}
}
@Slf4j
@ControllerAdvice
public class KafkaExceptionHandler {
/**
* Kafka 프로듀서 관련 예외 처리
*/
@ExceptionHandler(KafkaProducerException.class)
public ResponseEntity<String> handleKafkaProducerException(KafkaProducerException e) {
log.error("Kafka 프로듀서 예외 발생", e);
return ResponseEntity
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("메시지 발행 중 오류 발생: " + e.getMessage());
}
/**
* Kafka 타임아웃 예외 처리
*/
@ExceptionHandler(TimeoutException.class)
public ResponseEntity<String> handleTimeoutException(TimeoutException e) {
log.error("Kafka 타임아웃 발생", e);
return ResponseEntity
.status(HttpStatus.REQUEST_TIMEOUT)
.body("Kafka 처리 시간 초과: " + e.getMessage());
}
/**
* 메시지 유효성 검증 실패 처리
*/
@ExceptionHandler(MethodArgumentNotValidException.class)
public ResponseEntity<String> handleValidationException(MethodArgumentNotValidException e) {
log.error("메시지 유효성 검증 실패", e);
return ResponseEntity
.status(HttpStatus.BAD_REQUEST)
.body("잘못된 메시지 형식: " + e.getMessage());
}
/**
* 기타 Kafka 관련 예외 처리
*/
@ExceptionHandler(org.springframework.kafka.KafkaException.class)
public ResponseEntity<String> handleKafkaException(org.springframework.kafka.KafkaException e) {
log.error("Kafka 예외 발생", e);
return ResponseEntity
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Kafka 처리 중 오류 발생: " + e.getMessage());
}
}
5. 토픽에 메세지 보내기
curl -X POST http://localhost:8080/api/kafka/publish -H "Content-Type: application/json" -d "테스트 메시지"
6. 카프카에서 직접 보고 싶다면
-- 토픽 리스트 확인
docker exec -it kafka /bin/kafka-topics --bootstrap-server localhost:9092 --list
-- 토픽 상세 정보 확인
docker exec -it kafka /bin/kafka-topics --bootstrap-server localhost:9092 --describe --topic test-topic
-- 터미널1 (컨슈머 켜놓기)
docker exec -it kafka /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
-- 터미널2 (프로듀서로 메시지 보내기)
docker exec -it kafka /bin/kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic
'개발 > 백엔드' 카테고리의 다른 글
Spring Circuit Breaker (0) | 2024.12.30 |
---|---|
Spring Rate Limiter (1) | 2024.12.26 |
Kafka 기본 개념과 구조 (1) | 2024.12.25 |
Spring Redis - SortedSet으로 대기열 만들기 (1) | 2024.12.23 |
Spring Redis 기본 설정 (1) | 2024.12.23 |