실시간 인기글 집계 시스템

이벤트 반영 실시간 스트림

Kafka 이벤트 스트림으로 당일 인기글 Top 10을 실시간 집계하는 시스템

JavaSpringKafkaRedisOutbox

아키텍처

이벤트 서비스좋아요/댓글/조회저장Outbox + Relay이벤트 발행발행Kafka이벤트 스트림구독인기글 서비스점수 집계RedisZSet

개요

  • 좋아요, 댓글, 조회 이벤트를 기반으로 당일 인기글 점수 집계
  • Kafka 이벤트 스트림과 Redis Sorted Set을 이용한 Top 10 갱신

문제

  • 매일 0시 전수 계산 방식으로는 실시간 인기글 반영 불가
  • 좋아요·댓글·조회 등 점수 요소가 여러 서비스에 분산되어 실시간 집계 필요

해결 전략

  • 게시글 생성·삭제, 좋아요, 댓글, 조회 이벤트를 Kafka로 발행
  • 인기글 서비스에서 좋아요·댓글·조회 수에 가중치를 곱해 점수를 직접 계산
  • Transactional Outbox로 DB 변경과 이벤트 발행을 한 트랜잭션으로 묶고, 발행 실패·미실행으로 남은 미발행 이벤트는 폴링 relay가 재발행해 유실 구간 보완

기술 선택 이유

  • Kafka

    • 여러 서비스에서 발생한 이벤트를 순차 로그로 수집하기에 적합
    • 디스크 순차 쓰기와 파티션 분산 구조로 대량 이벤트를 높은 처리량으로 처리
    • Consumer 장애 후 Offset 기반 재처리 가능
  • Transactional Outbox

    • DB 커밋과 메시지 발행을 한 트랜잭션으로 묶기 어려운 구조를, 커밋과 동일 트랜잭션에 Outbox를 저장해 보완
    • 커밋 직후 fast-path 발행으로 지연을 낮추고, 발행 실패·미실행으로 남은 Outbox는 폴링 relay가 재발행해 at-least-once 보장
    • 다중 인스턴스는 Coordinator의 Redis 기반 샤드 분배로 같은 Outbox를 중복 폴링하지 않도록 조정
  • Redis Sorted Set

    • 인기글 점수 갱신과 Top N 조회를 같은 자료구조에서 처리 가능
    • 애플리케이션에서 점수를 직접 계산해 ZSet에 반영하고 ZREVRANGE로 Top N 조회

핵심 흐름

  • 비즈니스 트랜잭션 안에서 Outbox 이벤트 저장 (BEFORE_COMMIT)
  • 커밋 직후 fast-path로 Kafka 발행을 시도하고, 성공 시에만 Outbox 삭제 (AFTER_COMMIT, 비동기)
  • 앱 비정상 종료·발행 실패 등으로 남은 미발행 Outbox는 폴링 relay(@Scheduled)가 주기적으로 재발행
  • MessageRelayCoordinator가 Redis ZSet 하트비트(ping 3초, 3회 누락 시 제외)로 살아있는 인스턴스를 추적하고 샤드를 분배해, 폴링 relay가 인스턴스 간 중복 없이 동작
  • 인기글 서비스가 점수를 직접 계산해 Redis ZSet(Top 10)을 갱신

핵심 코드

  • 커밋 전 Outbox 저장(BEFORE_COMMIT) → 커밋 후 fast-path 발행(AFTER_COMMIT) → 미발행분 폴링 재발행(@Scheduled)
// MessageRelay
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void createOutbox(OutboxEvent outboxEvent) {
    outboxRepository.save(outboxEvent.getOutbox());   // 비즈니스 트랜잭션 안에서 Outbox 저장
}

@Async("messageRelayPublishEventExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void publishEvent(OutboxEvent outboxEvent) {   // 커밋 성공 후 fast-path 발행
    publishEvent(outboxEvent.getOutbox());
}

private void publishEvent(Outbox outbox) {
    try {
        messageRelayKafkaTemplate.send(
                outbox.getEventType().getTopic(),
                String.valueOf(outbox.getShardKey()),
                outbox.getPayload()
        ).get(1, TimeUnit.SECONDS);
        outboxRepository.delete(outbox);              // 발행 성공 시에만 삭제
    } catch (Exception e) {
        log.error("[MessageRelay.publishEvent] outbox = {}", outbox, e);
        // 실패 시 삭제하지 않음 → 폴링 relay가 재발행
    }
}

// 미발행으로 남은(생성 10초 경과) Outbox를 주기적으로 재발행
@Scheduled(fixedDelay = 10, initialDelay = 5, timeUnit = TimeUnit.SECONDS,
           scheduler = "messageRelayPublishPendingExecutor")
public void publishPendingEvent() {
    AssignedShard assignedShard = messageRelayCoordinator.assignedShards();
    for (Long shard : assignedShard.getShards()) {    // 내 인스턴스에 배정된 샤드만 폴링
        List<Outbox> outboxes = outboxRepository
                .findAllByShardKeyAndCreatedAtLessThanEqualOrderByCreatedAtAsc(
                        shard, LocalDateTime.now().minusSeconds(10), Pageable.ofSize(100));
        outboxes.forEach(this::publishEvent);
    }
}

배운 점

  • @TransactionalEventListener로 비즈니스 트랜잭션 안에서 Outbox를 저장하고, 커밋 성공 후에만 발행해 DB 변경과 메시지 발행의 정합성 확보
  • fast-path 발행 + 미발행 Outbox 폴링 재발행 + 발행 성공 시에만 삭제의 조합으로 at-least-once 보장 (발행 실패분도 테이블에 남아 재발행됨)
  • at-least-once는 중복 수신을 전제하므로, 소비 측 멱등 처리(이벤트 ID 기반 중복 제거)가 점수 정합성을 위한 다음 보완 지점
  • Redis Sorted Set으로 점수 갱신과 Top N 조회를 한 자료구조에서 단순하게 처리

소스 코드

GitHub에서 전체 코드 보기 ↗