SortedSet을 사용하는 방식과 Stream을 이용하는 방식이 있음
Stream은 더 복잡하지만 메시지 손실 방지, 장애 복구, 처리 보장 등 장점이 있음
그래서 주로 트랜잭션이 필요한 더 중요한 곳에서 사용하는 듯
SortedSet은 score 기반 정렬(진입시간)과 rank 조회(순위)를 활용해 대기열을 구현하기 좋음
전체적인 구조
[Browser] → [Front Server(Next.js)] → [API Server] ←→ [Redis(Redisson)]
구조:
┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Front Server │ ──────│ API Server │ ─────│ Redis │
│ (Next.js) │ │ (Spring) │ │ (Redisson) │
└─────────────────┘ └─────────────────┘ └──────────────┘
Redis 데이터 구조:
waiting:{productId} (Sorted Set)
processing:{productId} (String)
capacity:{productId} (String)
┌─────────────────────────────┐
│ waiting:product1 │ ┌─────────────────────────────┐
├─────────────────────────────┤ │ processing:product1 │
│ Score | Value │ ├─────────────────────────────┤
│ 1703304001 | session1 │ │ - session4 │
│ 1703304002 | session2 │ │ - session5 │
│ 1703304003 | session3 │ │ - session6 │
└─────────────────────────────┘ └─────────────────────────────┘
┌─────────────────────────────┐
│ capacity:product1 │
├─────────────────────────────┤
│ 100 │
└─────────────────────────────┘
1. 최초 접근
[Browser] → GET /products/1
[Front] → GET /api/products/1/queue
[API] → 대기열 진입
Response ← { "state": "WAITING", "position": 3 }
2. 상태 확인 (5초마다)
[Browser] → GET /api/products/1/queue
Response ← { "state": "WAITING", "position": 1 }
Response ← { "state": "ALLOWED", "position": 0 }
3. ALLOWED 시 상품 정보 요청
[Browser] → GET /api/products/1
Response ← { productId: "1", name: "상품명", ... }
구현 코드
1. 컨트롤러
@RestController
@RequestMapping("/api")
@Slf4j
public class QueueController {
private final QueueService queueService;
public QueueController(QueueService queueService) {
this.queueService = queueService;
}
@GetMapping("/products/{productId}/queue")
public ResponseEntity<QueueCheckResponse> checkQueue(
@PathVariable String productId,
HttpSession session) {
try {
String userId = session.getId();
QueueCheckResponse response = queueService.checkQueueStatus(productId, userId);
return ResponseEntity.ok(response);
} catch (QueueException e) {
log.warn("Queue check failed: {}", e.getMessage());
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.body(new QueueCheckResponse(QueueState.ERROR, -1L));
} catch (Exception e) {
log.error("Queue check failed for productId: {}", productId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new QueueCheckResponse(QueueState.ERROR, -1L));
}
}
}
2. 서비스
@Service
@Slf4j
public class QueueService {
private final StringRedisTemplate redisTemplate;
private static final String WAITING_PREFIX = "waiting:";
private static final String PROCESSING_PREFIX = "processing:";
private static final String CAPACITY_PREFIX = "capacity:";
private static final int DEFAULT_CAPACITY = 100;
public QueueService(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public QueueCheckResponse checkQueueStatus(String productId, String userId) {
String waitingKey = WAITING_PREFIX + productId;
String processingKey = PROCESSING_PREFIX + productId;
// 이미 처리 중인 사용자 확인
Boolean isProcessing = redisTemplate.opsForSet().isMember(processingKey, userId);
if (Boolean.TRUE.equals(isProcessing)) {
return new QueueCheckResponse(QueueState.ALLOWED, 0L);
}
// 대기열 순서 확인
Long position = redisTemplate.opsForZSet().rank(waitingKey, userId);
// 신규 대기열 진입
if (position == null) {
redisTemplate.opsForZSet().add(waitingKey, userId, System.currentTimeMillis());
position = redisTemplate.opsForZSet().rank(waitingKey, userId);
}
// 입장 가능 여부 확인
if (canEnter(productId)) {
promoteUser(productId, userId);
return new QueueCheckResponse(QueueState.ALLOWED, 0L);
}
return new QueueCheckResponse(QueueState.WAITING, position);
}
@Scheduled(fixedRate = 1000)
public void processQueue() {
Set<String> waitingKeys = redisTemplate.keys(WAITING_PREFIX + "*");
if (waitingKeys == null) return;
for (String waitingKey : waitingKeys) {
try {
String productId = waitingKey.substring(WAITING_PREFIX.length());
processProductQueue(productId);
} catch (Exception e) {
log.error("대기열 처리 실패: {}", waitingKey, e);
}
}
}
private void processProductQueue(String productId) {
String processingKey = PROCESSING_PREFIX + productId;
String waitingKey = WAITING_PREFIX + productId;
Long currentProcessing = redisTemplate.opsForSet().size(processingKey);
String capacityStr = redisTemplate.opsForValue().get(CAPACITY_PREFIX + productId);
int capacity = capacityStr != null ? Integer.parseInt(capacityStr) : DEFAULT_CAPACITY;
int available = capacity - (currentProcessing != null ? currentProcessing.intValue() : 0);
if (available > 0) {
// 대기열에서 처리 가능한 만큼 사용자 가져오기
Set<String> usersToPromote = redisTemplate.opsForZSet()
.range(waitingKey, 0, available - 1);
if (usersToPromote != null && !usersToPromote.isEmpty()) {
for (String userId : usersToPromote) {
try {
promoteUser(productId, userId);
} catch (Exception e) {
log.error("사용자 상태 변경 실패: {} - {}", productId, userId, e);
}
}
log.info("상품 {}: {}명의 사용자를 대기열에서 처리 중으로 이동",
productId, usersToPromote.size());
}
}
}
private void promoteUser(String productId, String userId) {
String processingKey = PROCESSING_PREFIX + productId;
String waitingKey = WAITING_PREFIX + productId;
redisTemplate.execute(new SessionCallback<List<Object>>() {
@Override
public List<Object> execute(RedisOperations operations) {
operations.multi();
operations.opsForSet().add(processingKey, userId);
operations.opsForZSet().remove(waitingKey, userId);
// 30분 후 만료
operations.expire(processingKey, 30, TimeUnit.MINUTES);
return operations.exec();
}
});
}
private boolean canEnter(String productId) {
String processingKey = PROCESSING_PREFIX + productId;
String capacityKey = CAPACITY_PREFIX + productId;
Long currentProcessing = redisTemplate.opsForSet().size(processingKey);
String capacityStr = redisTemplate.opsForValue().get(capacityKey);
int capacity = capacityStr != null ? Integer.parseInt(capacityStr) : DEFAULT_CAPACITY;
return currentProcessing < capacity;
}
// 관리자용 메서드
public QueueStats getQueueStats(String productId) {
String waitingKey = WAITING_PREFIX + productId;
String processingKey = PROCESSING_PREFIX + productId;
String capacityKey = CAPACITY_PREFIX + productId;
Long waitingCount = redisTemplate.opsForZSet().size(waitingKey);
Long processingCount = redisTemplate.opsForSet().size(processingKey);
String capacityStr = redisTemplate.opsForValue().get(capacityKey);
int capacity = capacityStr != null ? Integer.parseInt(capacityStr) : DEFAULT_CAPACITY;
return new QueueStats(
waitingCount != null ? waitingCount : 0,
processingCount != null ? processingCount : 0,
capacity
);
}
public void updateCapacity(String productId, long capacity) {
redisTemplate.opsForValue().set(CAPACITY_PREFIX + productId, String.valueOf(capacity));
}
}
'개발 > 백엔드' 카테고리의 다른 글
Spring Circuit Breaker (0) | 2024.12.30 |
---|---|
Spring Rate Limiter (1) | 2024.12.26 |
Spring Kafka 기본 설정 및 실행 (2) | 2024.12.25 |
Kafka 기본 개념과 구조 (1) | 2024.12.25 |
Spring Redis 기본 설정 (1) | 2024.12.23 |