실시간 인기글 집계 시스템
이벤트 반영 실시간 스트림Kafka 이벤트 스트림으로 당일 인기글 Top 10을 실시간 집계하는 시스템
JavaSpringKafkaRedisOutbox
아키텍처
개요
- 좋아요, 댓글, 조회 이벤트를 기반으로 당일 인기글 점수 집계
- Kafka 이벤트 스트림과 Redis Sorted Set을 이용한 Top 10 갱신
문제
- 매일 0시 전수 계산 방식으로는 실시간 인기글 반영 불가
- 좋아요·댓글·조회 등 점수 요소가 여러 서비스에 분산되어 실시간 집계 필요
해결 전략
- 게시글 생성·삭제, 좋아요, 댓글, 조회 이벤트를 Kafka로 발행
- 인기글 서비스에서 좋아요·댓글·조회 수에 가중치를 곱해 점수를 직접 계산
- Transactional Outbox로 DB 변경과 이벤트 발행을 한 트랜잭션으로 묶고, 발행 실패·미실행으로 남은 미발행 이벤트는 폴링 relay가 재발행해 유실 구간 보완
기술 선택 이유
-
Kafka
- 여러 서비스에서 발생한 이벤트를 순차 로그로 수집하기에 적합
- 디스크 순차 쓰기와 파티션 분산 구조로 대량 이벤트를 높은 처리량으로 처리
- Consumer 장애 후 Offset 기반 재처리 가능
-
Transactional Outbox
- DB 커밋과 메시지 발행을 한 트랜잭션으로 묶기 어려운 구조를, 커밋과 동일 트랜잭션에 Outbox를 저장해 보완
- 커밋 직후 fast-path 발행으로 지연을 낮추고, 발행 실패·미실행으로 남은 Outbox는 폴링 relay가 재발행해 at-least-once 보장
- 다중 인스턴스는 Coordinator의 Redis 기반 샤드 분배로 같은 Outbox를 중복 폴링하지 않도록 조정
-
Redis Sorted Set
- 인기글 점수 갱신과 Top N 조회를 같은 자료구조에서 처리 가능
- 애플리케이션에서 점수를 직접 계산해 ZSet에 반영하고
ZREVRANGE로 Top N 조회
핵심 흐름
- 비즈니스 트랜잭션 안에서 Outbox 이벤트 저장 (BEFORE_COMMIT)
- 커밋 직후 fast-path로 Kafka 발행을 시도하고, 성공 시에만 Outbox 삭제 (AFTER_COMMIT, 비동기)
- 앱 비정상 종료·발행 실패 등으로 남은 미발행 Outbox는 폴링 relay(@Scheduled)가 주기적으로 재발행
- MessageRelayCoordinator가 Redis ZSet 하트비트(ping 3초, 3회 누락 시 제외)로 살아있는 인스턴스를 추적하고 샤드를 분배해, 폴링 relay가 인스턴스 간 중복 없이 동작
- 인기글 서비스가 점수를 직접 계산해 Redis ZSet(Top 10)을 갱신
핵심 코드
- 커밋 전 Outbox 저장(BEFORE_COMMIT) → 커밋 후 fast-path 발행(AFTER_COMMIT) → 미발행분 폴링 재발행(@Scheduled)
// MessageRelay
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void createOutbox(OutboxEvent outboxEvent) {
outboxRepository.save(outboxEvent.getOutbox()); // 비즈니스 트랜잭션 안에서 Outbox 저장
}
@Async("messageRelayPublishEventExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void publishEvent(OutboxEvent outboxEvent) { // 커밋 성공 후 fast-path 발행
publishEvent(outboxEvent.getOutbox());
}
private void publishEvent(Outbox outbox) {
try {
messageRelayKafkaTemplate.send(
outbox.getEventType().getTopic(),
String.valueOf(outbox.getShardKey()),
outbox.getPayload()
).get(1, TimeUnit.SECONDS);
outboxRepository.delete(outbox); // 발행 성공 시에만 삭제
} catch (Exception e) {
log.error("[MessageRelay.publishEvent] outbox = {}", outbox, e);
// 실패 시 삭제하지 않음 → 폴링 relay가 재발행
}
}
// 미발행으로 남은(생성 10초 경과) Outbox를 주기적으로 재발행
@Scheduled(fixedDelay = 10, initialDelay = 5, timeUnit = TimeUnit.SECONDS,
scheduler = "messageRelayPublishPendingExecutor")
public void publishPendingEvent() {
AssignedShard assignedShard = messageRelayCoordinator.assignedShards();
for (Long shard : assignedShard.getShards()) { // 내 인스턴스에 배정된 샤드만 폴링
List<Outbox> outboxes = outboxRepository
.findAllByShardKeyAndCreatedAtLessThanEqualOrderByCreatedAtAsc(
shard, LocalDateTime.now().minusSeconds(10), Pageable.ofSize(100));
outboxes.forEach(this::publishEvent);
}
}
배운 점
@TransactionalEventListener로 비즈니스 트랜잭션 안에서 Outbox를 저장하고, 커밋 성공 후에만 발행해 DB 변경과 메시지 발행의 정합성 확보- fast-path 발행 + 미발행 Outbox 폴링 재발행 + 발행 성공 시에만 삭제의 조합으로 at-least-once 보장 (발행 실패분도 테이블에 남아 재발행됨)
- at-least-once는 중복 수신을 전제하므로, 소비 측 멱등 처리(이벤트 ID 기반 중복 제거)가 점수 정합성을 위한 다음 보완 지점
- Redis Sorted Set으로 점수 갱신과 Top N 조회를 한 자료구조에서 단순하게 처리