본문 바로가기

개발/백엔드

Spring Kafka 기본 설정 및 실행

도커에 카프카 설치, 스프링에 연동해보자

도커 데스크탑 설치가 되어 있어야 함

 

1. docker-compose.yml 생성 (아무데나 해도 되지만 프로젝트 or 도커 작업 경로에 두는게 좋음)

[Kafka Cluster]
└── Broker 1 (단일 브로커)
    └── Topic: test-topic
        └── Partition 0 (파티션 1개)
        
        
// docker-compose.yml
version: '3'
services:
  kafka:
    image: confluentinc/cp-kafka:7.5.1
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:19092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:19092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: 'LlP5PCzPR4iuUNXJNOXo7Q'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    volumes:
      - kafka_data:/var/lib/kafka/data

volumes:
  kafka_data:



2. yml 파일이 있는 위치에서 powershell로 docker compose up -d 실행

 

도커 테스크탑에서 실행중인 카프카 확인

 

 

3. Spring Boot 3 설정

spring:
  kafka:
    # Kafka 브로커 서버 주소
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group # 컨슈머 그룹 ID (같은 그룹은 메시지를 분산 처리)
      # 새로운 컨슈머 그룹이 생성될 때 어디서부터 메시지를 읽을지 설정
      # earliest: 가장 처음부터 / latest: 가장 최근부터
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        enable.auto.commit: true
        auto.commit.interval.ms: 100 # 오프셋 커밋 간격 (ms)
        session.timeout.ms: 45000 # 세션 타임아웃 시간 (ms)
        allow.auto.create.topics: true # 토픽이 없을 경우 자동 생성 허용
        partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor # 파티션 할당 전략 (RoundRobin: 균등 분배)
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

logging:
  level:
    org.apache.kafka: INFO
    org.springframework.kafka: INFO
    ## 패키지 이름
    com.blog: DEBUG

 

 

4. 카프카 코드 작성

@Configuration
public class KafkaTopicConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    /**
     * 애플리케이션 시작 시 자동으로 토픽을 생성하는 Bean
     * @return 새로 생성된 토픽 정보
     */
    @Bean
    public NewTopic testTopic() {
        return TopicBuilder.name("test-topic")
         
         .partitions(1)    // 파티션 수
                .replicas(1)      // 복제 팩터 수
                .build();
    }
}


@Slf4j
@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
public class KafkaController {
    private final KafkaProducer producer;

    @PostMapping("/publish")
    public ResponseEntity<String> publish(@RequestBody String message) {
        log.info("=== 컨트롤러에서 메시지 수신: {} ===", message);
        producer.sendMessage("test-topic", message);
        return ResponseEntity.ok("메시지 발행 요청 완료");
    }
}


@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        log.info("=== 프로듀서 메시지 발행 시작 ===");
        log.info("토픽: {}", topic);
        log.info("메시지: {}", message);

        kafkaTemplate.send(topic, message)
                .thenAccept(result -> {
                    log.info("=== 메시지 발행 성공 ===");
                    log.info("토픽: {}", result.getRecordMetadata().topic());
                    log.info("파티션: {}", result.getRecordMetadata().partition());
                    log.info("오프셋: {}", result.getRecordMetadata().offset());
                    log.info("타임스탬프: {}", result.getRecordMetadata().timestamp());
                })
                .exceptionally(e -> {
                    log.error("=== 메시지 발행 실패 ===", e);
                    return null;
                });
    }
}

@Slf4j
@Service
@EnableKafka
public class KafkaConsumer {
    @KafkaListener(
            topics = "test-topic",
            groupId = "my-group",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void listen(
            String message,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset
    ) {
        try {
            log.info("=== 컨슈머 메시지 수신 ===");
            log.info("토픽: {}", topic);
            log.info("파티션: {}", partition);
            log.info("오프셋: {}", offset);
            log.info("메시지: {}", message);

            // 메시지 처리 로직
            log.info("=== 메시지 처리 완료 ===");
        } catch (Exception e) {
            log.error("메시지 처리 중 에러 발생", e);
            throw e;
        }
    }
}


@Slf4j
@ControllerAdvice
public class KafkaExceptionHandler {

    /**
     * Kafka 프로듀서 관련 예외 처리
     */
    @ExceptionHandler(KafkaProducerException.class)
    public ResponseEntity<String> handleKafkaProducerException(KafkaProducerException e) {
        log.error("Kafka 프로듀서 예외 발생", e);
        return ResponseEntity
                .status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("메시지 발행 중 오류 발생: " + e.getMessage());
    }

    /**
     * Kafka 타임아웃 예외 처리
     */
    @ExceptionHandler(TimeoutException.class)
    public ResponseEntity<String> handleTimeoutException(TimeoutException e) {
        log.error("Kafka 타임아웃 발생", e);
        return ResponseEntity
                .status(HttpStatus.REQUEST_TIMEOUT)
                .body("Kafka 처리 시간 초과: " + e.getMessage());
    }

    /**
     * 메시지 유효성 검증 실패 처리
     */
    @ExceptionHandler(MethodArgumentNotValidException.class)
    public ResponseEntity<String> handleValidationException(MethodArgumentNotValidException e) {
        log.error("메시지 유효성 검증 실패", e);
        return ResponseEntity
                .status(HttpStatus.BAD_REQUEST)
                .body("잘못된 메시지 형식: " + e.getMessage());
    }

    /**
     * 기타 Kafka 관련 예외 처리
     */
    @ExceptionHandler(org.springframework.kafka.KafkaException.class)
    public ResponseEntity<String> handleKafkaException(org.springframework.kafka.KafkaException e) {
        log.error("Kafka 예외 발생", e);
        return ResponseEntity
                .status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("Kafka 처리 중 오류 발생: " + e.getMessage());
    }
}

 

 

5. 토픽에 메세지 보내기

 

curl -X POST http://localhost:8080/api/kafka/publish -H "Content-Type: application/json" -d "테스트 메시지"

 

 

6. 카프카에서 직접 보고 싶다면

-- 토픽 리스트 확인
docker exec -it kafka /bin/kafka-topics --bootstrap-server localhost:9092 --list

-- 토픽 상세 정보 확인
docker exec -it kafka /bin/kafka-topics --bootstrap-server localhost:9092 --describe --topic test-topic

-- 터미널1 (컨슈머 켜놓기)
docker exec -it kafka /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning

-- 터미널2 (프로듀서로 메시지 보내기)
docker exec -it kafka /bin/kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic

 

'개발 > 백엔드' 카테고리의 다른 글

Spring Circuit Breaker  (0) 2024.12.30
Spring Rate Limiter  (1) 2024.12.26
Kafka 기본 개념과 구조  (1) 2024.12.25
Spring Redis - SortedSet으로 대기열 만들기  (1) 2024.12.23
Spring Redis 기본 설정  (1) 2024.12.23