본문 바로가기

개발/백엔드

Spring Redis - SortedSet으로 대기열 만들기

SortedSet을 사용하는 방식과 Stream을 이용하는 방식이 있음

Stream은 더 복잡하지만 메시지 손실 방지, 장애 복구, 처리 보장 등 장점이 있음

그래서 주로 트랜잭션이 필요한 더 중요한 곳에서 사용하는 듯

 

SortedSet은 score 기반 정렬(진입시간)과 rank 조회(순위)를 활용해 대기열을 구현하기 좋음

 

전체적인 구조

[Browser] → [Front Server(Next.js)] → [API Server] ←→ [Redis(Redisson)]

구조:
┌─────────────────┐       ┌─────────────────┐      ┌──────────────┐
│   Front Server  │ ──────│    API Server   │ ─────│     Redis    │
│    (Next.js)    │       │    (Spring)     │      │  (Redisson)  │
└─────────────────┘       └─────────────────┘      └──────────────┘

Redis 데이터 구조:
waiting:{productId} (Sorted Set)
processing:{productId} (String)
capacity:{productId} (String)

┌─────────────────────────────┐
│ waiting:product1            │   ┌─────────────────────────────┐
├─────────────────────────────┤   │ processing:product1         │
│ Score      | Value          │   ├─────────────────────────────┤
│ 1703304001 | session1       │   │ - session4                  │
│ 1703304002 | session2       │   │ - session5                  │
│ 1703304003 | session3       │   │ - session6                  │
└─────────────────────────────┘   └─────────────────────────────┘
┌─────────────────────────────┐
│ capacity:product1           │
├─────────────────────────────┤
│ 100                         │
└─────────────────────────────┘

1. 최초 접근
[Browser] → GET /products/1
[Front]   → GET /api/products/1/queue
[API]     → 대기열 진입
Response  ← { "state": "WAITING", "position": 3 }

2. 상태 확인 (5초마다)
[Browser] → GET /api/products/1/queue
Response  ← { "state": "WAITING", "position": 1 }
Response  ← { "state": "ALLOWED", "position": 0 }

3. ALLOWED 시 상품 정보 요청
[Browser] → GET /api/products/1
Response  ← { productId: "1", name: "상품명", ... }

 

 

구현 코드

 

1. 컨트롤러

@RestController
@RequestMapping("/api")
@Slf4j
public class QueueController {
    
    private final QueueService queueService;
    
    public QueueController(QueueService queueService) {
        this.queueService = queueService;
    }
    
    @GetMapping("/products/{productId}/queue")
    public ResponseEntity<QueueCheckResponse> checkQueue(
            @PathVariable String productId,
            HttpSession session) {
        try {
            String userId = session.getId();
            QueueCheckResponse response = queueService.checkQueueStatus(productId, userId);
            return ResponseEntity.ok(response);
        } catch (QueueException e) {
            log.warn("Queue check failed: {}", e.getMessage());
            return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
                .body(new QueueCheckResponse(QueueState.ERROR, -1L));
        } catch (Exception e) {
            log.error("Queue check failed for productId: {}", productId, e);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(new QueueCheckResponse(QueueState.ERROR, -1L));
        }
    }
}

 

2. 서비스

@Service
@Slf4j
public class QueueService {
    private final StringRedisTemplate redisTemplate;
    
    private static final String WAITING_PREFIX = "waiting:";
    private static final String PROCESSING_PREFIX = "processing:";
    private static final String CAPACITY_PREFIX = "capacity:";
    private static final int DEFAULT_CAPACITY = 100;
    
    public QueueService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public QueueCheckResponse checkQueueStatus(String productId, String userId) {
        String waitingKey = WAITING_PREFIX + productId;
        String processingKey = PROCESSING_PREFIX + productId;
        
        // 이미 처리 중인 사용자 확인
        Boolean isProcessing = redisTemplate.opsForSet().isMember(processingKey, userId);
        if (Boolean.TRUE.equals(isProcessing)) {
            return new QueueCheckResponse(QueueState.ALLOWED, 0L);
        }
        
        // 대기열 순서 확인
        Long position = redisTemplate.opsForZSet().rank(waitingKey, userId);
        
        // 신규 대기열 진입
        if (position == null) {
            redisTemplate.opsForZSet().add(waitingKey, userId, System.currentTimeMillis());
            position = redisTemplate.opsForZSet().rank(waitingKey, userId);
        }
        
        // 입장 가능 여부 확인
        if (canEnter(productId)) {
            promoteUser(productId, userId);
            return new QueueCheckResponse(QueueState.ALLOWED, 0L);
        }
        
        return new QueueCheckResponse(QueueState.WAITING, position);
    }

    @Scheduled(fixedRate = 1000)
    public void processQueue() {
        Set<String> waitingKeys = redisTemplate.keys(WAITING_PREFIX + "*");
        if (waitingKeys == null) return;
        
        for (String waitingKey : waitingKeys) {
            try {
                String productId = waitingKey.substring(WAITING_PREFIX.length());
                processProductQueue(productId);
            } catch (Exception e) {
                log.error("대기열 처리 실패: {}", waitingKey, e);
            }
        }
    }

    private void processProductQueue(String productId) {
        String processingKey = PROCESSING_PREFIX + productId;
        String waitingKey = WAITING_PREFIX + productId;
        
        Long currentProcessing = redisTemplate.opsForSet().size(processingKey);
        String capacityStr = redisTemplate.opsForValue().get(CAPACITY_PREFIX + productId);
        int capacity = capacityStr != null ? Integer.parseInt(capacityStr) : DEFAULT_CAPACITY;
        
        int available = capacity - (currentProcessing != null ? currentProcessing.intValue() : 0);
        
        if (available > 0) {
            // 대기열에서 처리 가능한 만큼 사용자 가져오기
            Set<String> usersToPromote = redisTemplate.opsForZSet()
                .range(waitingKey, 0, available - 1);
            
            if (usersToPromote != null && !usersToPromote.isEmpty()) {
                for (String userId : usersToPromote) {
                    try {
                        promoteUser(productId, userId);
                    } catch (Exception e) {
                        log.error("사용자 상태 변경 실패: {} - {}", productId, userId, e);
                    }
                }
                
                log.info("상품 {}: {}명의 사용자를 대기열에서 처리 중으로 이동", 
                    productId, usersToPromote.size());
            }
        }
    }

    private void promoteUser(String productId, String userId) {
        String processingKey = PROCESSING_PREFIX + productId;
        String waitingKey = WAITING_PREFIX + productId;
        
        redisTemplate.execute(new SessionCallback<List<Object>>() {
            @Override
            public List<Object> execute(RedisOperations operations) {
                operations.multi();
                operations.opsForSet().add(processingKey, userId);
                operations.opsForZSet().remove(waitingKey, userId);
                // 30분 후 만료
                operations.expire(processingKey, 30, TimeUnit.MINUTES);
                return operations.exec();
            }
        });
    }

    private boolean canEnter(String productId) {
        String processingKey = PROCESSING_PREFIX + productId;
        String capacityKey = CAPACITY_PREFIX + productId;
        
        Long currentProcessing = redisTemplate.opsForSet().size(processingKey);
        String capacityStr = redisTemplate.opsForValue().get(capacityKey);
        int capacity = capacityStr != null ? Integer.parseInt(capacityStr) : DEFAULT_CAPACITY;
        
        return currentProcessing < capacity;
    }

    // 관리자용 메서드
    public QueueStats getQueueStats(String productId) {
        String waitingKey = WAITING_PREFIX + productId;
        String processingKey = PROCESSING_PREFIX + productId;
        String capacityKey = CAPACITY_PREFIX + productId;

        Long waitingCount = redisTemplate.opsForZSet().size(waitingKey);
        Long processingCount = redisTemplate.opsForSet().size(processingKey);
        String capacityStr = redisTemplate.opsForValue().get(capacityKey);
        int capacity = capacityStr != null ? Integer.parseInt(capacityStr) : DEFAULT_CAPACITY;

        return new QueueStats(
            waitingCount != null ? waitingCount : 0,
            processingCount != null ? processingCount : 0,
            capacity
        );
    }

    public void updateCapacity(String productId, long capacity) {
        redisTemplate.opsForValue().set(CAPACITY_PREFIX + productId, String.valueOf(capacity));
    }
}

'개발 > 백엔드' 카테고리의 다른 글

Spring Circuit Breaker  (0) 2024.12.30
Spring Rate Limiter  (1) 2024.12.26
Spring Kafka 기본 설정 및 실행  (2) 2024.12.25
Kafka 기본 개념과 구조  (1) 2024.12.25
Spring Redis 기본 설정  (1) 2024.12.23