본문 바로가기

카테고리 없음

Apache Kafka 정의 및 주요 특성 종합 정리

Apache Kafka는 실시간 데이터 스트리밍과 이벤트 기반 시스템 구축에 최적화된 분산 메시징 플랫폼이자 스트림 처리 시스템입니다.
수많은 데이터 원본으로부터 이벤트를 신속하고 안정적으로 수집·저장·분배하여, 다양한 애플리케이션 간 데이터 통합과 실시간 분석을 가능하게 합니다.

1. Kafka의 핵심 개념

  • 이벤트 중심(Event-Oriented)
    이벤트(메시지)를 중심으로 시스템이 구성되며, 발생한 이벤트를 실시간으로 수집하고 처리합니다.
  • Publish-Subscribe 모델
    • Producer(생산자)가 메시지를 특정 Topic(주제)에 발행(publish)
    • Consumer(소비자)가 원하는 Topic을 구독(subscribe)하여 메시지를 실시간으로 받음
    • 메시지 소비 후에도 삭제되지 않아 여러 Consumer가 독립적으로 재처리 가능
  • 분산 아키텍처
    여러 대의 브로커(서버)로 구성된 클러스터에서 데이터가 분산 저장되고 처리됩니다

2. 주요 특징

확장성 (Scalability)

  • 데이터를 여러 Partition으로 나누고, 이를 다수의 브로커에 분산 저장
  • 서버(브로커)를 추가해 수평적 확장(Scale-out) 가능
  • 수천 대의 브로커, 수십만 개 Partition 지원 가능

신속함 (Low Latency & High Throughput)

  • 병렬 처리와 효율적 I/O로 초당 수백만 건 메시지 처리
  • 실시간 데이터 스트리밍에 적합

내구성 및 내결함성 (Durability & Fault Tolerance)

  • 메시지는 디스크에 영속 저장
  • 파티션은 복제되어 다중 서버에 분산 저장, 장애 시 데이터 손실 방지
  • ‘최소 한 번 전달(at-least-once)’ 기본 보장, ‘정확히 한 번 처리(exactly-once)’ 기능도 지원

로그 기반 저장 구조 (Log-Based Storage)

  • 메시지를 삭제하지 않고, 오프셋(offset)으로 관리
  • 소비자가 원하는 시점부터 자유롭게 메시지 재처리 가능
  • 메시지 필터링 및 다양한 소비자 처리 유연성 확보

실시간 스트림 처리 지원

  • Kafka Streams 라이브러리로 애플리케이션 내부에서 분산 스트림 처리 가능
  • Kafka Connect를 통해 외부 시스템과의 데이터 연동 및 통합 편리

마이크로서비스 및 비동기 통합 지원

  • 느슨한 결합 구조로 마이크로서비스 간 동기 호출 부담 완화
  • 이벤트 드리븐 아키텍처 구현에 최적화

3. Kafka의 활용 사례

  • 실시간 로그 및 이벤트 수집 및 분석
  • 대규모 메시징 및 이벤트 버스
  • 마이크로서비스 간 비동기 데이터 연동
  • 실시간 추천, 모니터링, 경고 시스템
  • 실시간 ETL 및 데이터 파이프라인 구축

4. 추가 인사이트

  • Kafka는 단순 메시징 시스템을 넘어 ‘실시간 데이터 파이프라인’과 ‘스트림 처리 플랫폼’ 역할을 합니다.
  • 분산 로그 저장 및 복제 구조 덕분에 데이터 안정성과 가용성이 높습니다.
  • ‘정확히 한 번 처리’ 기능으로 금융·결제 등 중요한 시스템에서도 신뢰성 확보 가능.
  • Kafka Connect와 Kafka Streams를 활용하면 개발자가 복잡한 데이터 통합과 처리 로직을 간편하게 구현할 수 있습니다.

5. 참고 출처

Kafka 기반 재난 대응 메시징 시스템 구조 설계 총정리

1. 시스템 개요 및 목적

  • 목적: 재난 발생 시 실시간으로 이벤트(신고, 센서 데이터 등)를 수집, 필터링, 통계 집계 후, 사용자에게 맞춤형 알림(Firebase Cloud Messaging, FCM) 전달
  • 핵심 요구사항
    • 대규모 재난 이벤트 안정적 처리 및 확장성 확보
    • 실시간 및 신속한 메시지 처리와 이벤트 필터링 기능
    • 통계 집계(예: 이벤트 발생 횟수) 연동 및 사용자 알림 가능
    • 내결함성과 데이터 내구성 보장
    • 병렬 소비자 처리를 통한 고성능 처리
    • FCM 연동을 통한 맞춤형 푸시 알림 전

2. Kafka의 구조적 특성 및 재난 대응 시스템 연관성

특성 설명  재난 대응  시스템 적용
이벤트 중심(Event-Oriented) 이벤트 단위로 메시지를 발행하고 소비 재난 신고, 센서 데이터 등 이벤트를 실시간 발행하여 신속 대응
분산 아키텍처 여러 브로커에 데이터 분산 저장 및 복제 수많은 재난 데이터를 여러 브로커에 분산해 시스템 확장과 내결함성 확보
Partition 기반 확장성 토픽을 여러 파티션으로 분리, 병렬 처리 가능 파티션 단위로 이벤트 필터링, 통계 집계 consumer를 병렬 처리하여 처리량 증가
로그 기반 메시지 저장 메시지를 삭제하지 않고 오프셋으로 관리 이벤트 재처리, 분석용 로그 저장 가능, 장애 복구 용이
실시간 스트림 처리 지원 Kafka Streams, Connect 활용 가능 필터링, 집계, 외부 시스템(Firebase, Redis) 연동을 스트림 형태로 처리 가능
     
내구성 및 내결함성 디스크 영속 저장, 복제 및 장애 복구 재난 이벤트 데이터 유실 없이 안전하게 저장, 장애 발생 시에도 서비스 유지

 

3. 시스템 아키텍처 구성 요소 및 데이터 흐름

[재난 이벤트 발생원(Producer)]
        └▶ 이벤트 발행 ▶ [Kafka 클러스터: DisasterEvents Topic]
        ├─ [이벤트 필터링 Consumer] → 필터링된 이벤트 선별 → 알림 대상 추출 → FCM 알림 서버 전송
       ├─ [통계 Consumer] → 이벤트 발생 횟수 집계 → Redis(통계 저장소) 저장
       └─ [로그 저장 Consumer] → 이벤트 로그 데이터 영속화 (분석, 재처리 목적)

        [FCM 알림 서버]
        └▶ 사용자 디바이스에 맞춤형 재난 알림 전달
  • Producer: 재난 신고 앱, 센서, 외부 API 등 이벤트 생성 주체
  • Kafka Broker: 이벤트를 분산 저장하고, 소비자가 독립적으로 메시지 소비 가능
  • Consumer 그룹:
    • 필터링 Consumer: 이벤트 속성에 따라 알림 대상자 결정 (ex. 특정 지역 사용자만)
    • 통계 Consumer: 이벤트 횟수, 발생 패턴 등 집계 (Redis에 저장하여 실시간 통계 관리)
    • 로그 Consumer: 이벤트 데이터를 장기 저장소나 분석용 DB에 저장
  • FCM 서버: 필터링 Consumer가 전달한 알림 정보를 FCM API 통해 사용자 디바이스로 푸시 알림 전송
  • Redis: 통계 집계 및 알림 횟수 관리 (1회, 10회, 30회 등 사용자 맞춤 알림 조건 판단에 활용)

 

4. 필터링 및 통계 처리

  • Kafka의 PartitionConsumer Group 구조 덕분에 필터링 Consumer를 여러 개 띄워 병렬 처리 가능
  • 이벤트 데이터의 속성 (지역, 재난 유형 등)에 따라 필터링 로직 수행
  • 통계 Consumer는 Kafka 메시지를 받아 Redis 등 메모리 기반 DB에 집계값 저장 → 빠른 읽기 및 알림 조건 판단 지원
  • 통계 결과(예: 사용자별 알림 횟수)를 기반으로 FCM 알림 전송 조건 제어 가능 (알림 중복 방지 및 사용자 경험 향상)

5. FCM 연동 및 사용자 알림

  • 필터링 Consumer는 알림 대상 이벤트 추출 후 별도의 알림 큐 또는 API 서버에 전송
  • 알림 서버는 FCM과 연동해 각 사용자에게 재난 알림을 푸시 전송
  • Kafka Consumer → Redis 통계 → FCM 알림 연계 구조로 알림의 정확성 및 실시간성 확보
  • 알림 로그 저장을 통해 전송 성공/실패 여부, 재전송 로직 구현 가능

6. 확장성 및 내결함성 고려

  • Kafka 클러스터는 서버(브로커) 증설로 손쉽게 확장 가능
  • 파티션 수 조절을 통해 병렬 처리량 조절 및 장애 격리
  • 메시지는 디스크에 영속 저장되고 복제되어 장애 시 데이터 유실 방지
  • Kafka Consumer 그룹을 적절히 분리해 처리 작업별 독립성과 부하 분산 가능

7. 한계 및 고려사항

  • 파티션이 너무 세분화될 경우 Consumer 관리 복잡도 증가 및 메시지 순서 보장 어려움
  • FCM 알림의 전송 실패나 지연 가능성 대비 재시도 로직 필요
  • Redis 기반 통계 집계는 메모리 사용량 관리와 장애 대비 백업 전략 필요
  • 실시간 필터링과 통계 집계 로직의 성능 최적화 중요 (특히 재난 상황 급증 시)

8. 결론 및 내 목적 요약

Kafka의 강력한 분산 로그 저장 및 스트림 처리 구조를 활용해 대규모 재난 이벤트를 안정적이고 확장성 있게 처리하며, 필터링과 통계 집계 정보를 기반으로 FCM 알림을 맞춤 전송하는 시스템을 설계한다. 이를 통해 실시간 재난 알림 서비스의 신뢰성과 유연성을 확보하고, 사용자 경험을 극대화한다.


9. 출처

 

10. 실제  코드(수정 필요)

더보기

✅ 1. DisasterEvent.java (DTO)

public class DisasterEvent {
    private String eventType;       // fire, flood, earthquake, epfh 등
    private String region;          // eg. 서울시 강남구
    private String level;           // LOW, MEDIUM, HIGH
    private LocalDateTime timestamp;
    private Map<String, Object> payload;  // 부가정보

    // Getters / Setters 생략
}

 

✅ 2. Kafka 설정 KafkaConfig.java

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, DisasterEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "disaster-stat-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, DisasterEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, DisasterEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // 병렬 소비자 수
        return factory;
    }

    @Bean
    public ProducerFactory<String, DisasterEvent> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, DisasterEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

✅ 3. Producer DisasterEventProducer.java

@Service
@RequiredArgsConstructor
public class DisasterEventProducer {

    private final KafkaTemplate<String, DisasterEvent> kafkaTemplate;

    public void sendDisasterEvent(DisasterEvent event) {
        String topic = "disaster." + event.getEventType().toLowerCase(); // eg. disaster.fire
        kafkaTemplate.send(topic, event);
        System.out.println("✅ Sent event to topic: " + topic);
    }
}

✅ 4. Consumer (통계 + FCM 로직 포함)

@Component
@RequiredArgsConstructor
public class DisasterStatsConsumer {

    private final RedisTemplate<String, Integer> redisTemplate;
    private final FcmService fcmService;

    private static final int ALERT_THRESHOLD = 3;

    @KafkaListener(
            topics = {"disaster.fire", "disaster.flood", "disaster.earthquake", "disaster.epfh"},
            groupId = "disaster-stat-group",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void consume(DisasterEvent event) {
        String type = event.getEventType().toLowerCase();
        String region = event.getRegion();
        String redisKey = "count:" + type + ":" + region;

        Long count = redisTemplate.opsForValue().increment(redisKey);

        System.out.printf("📊 [%s] %s → %d회 발생\n", type, region, count);

        // ✅ 알림 임계치 도달 시 FCM 전송
        if (count != null && count == ALERT_THRESHOLD) {
            fcmService.sendToRegion(region, "[경고] " + type.toUpperCase() + " 재난 " + ALERT_THRESHOLD + "회 발생!");
        }
    }
}

✅ 5. FCM 전송 서비스 (예시)

@Service
public class FcmService {

    public void sendToRegion(String region, String message) {
        // 실제 FCM 연동 로직 또는 외부 서비스 호출
        System.out.printf("📨 [FCM] %s 지역 → %s\n", region, message);
    }
}

 

✅ 6. Controller 테스트 (재난 등록)

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/disaster")
public class DisasterEventController {

    private final DisasterEventProducer producer;

    @PostMapping
    public ResponseEntity<Void> report(@RequestBody DisasterEvent event) {
        producer.sendDisasterEvent(event);
        return ResponseEntity.ok().build();
    }
}

 

🔁 Kafka 병렬 소비 팁

  • Kafka Consumer는 @KafkaListener(concurrency = "3")처럼 병렬로 동작 가능
  • Listener Container 설정을 통해 Thread Pool 조정 가능
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(3); // 병렬 Consumer 3개
    return factory;
}

 

 Kafka 설정 (application.yml)

spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: disaster-consumer-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

✅ 결과 흐름 요약

  1. /api/disaster에 JSON으로 재난 보고
  2. Kafka Topic(disaster.fire 등)으로 분기
    - "Kafka Topic을 fire, flood, earthquake, epfh 등 이벤트 유형으로 나눈 상태"(topic의 예시일 뿐)
  3. Consumer 수신 → Redis에 지역별 count 증가
  4. count ≥ 3일 경우 → 해당 지역 사용자에게 FCM 알림 전송

💡 확장 아이디어

  • epfh 외에도 disaster.nuclear, disaster.typhoon 등 추가 가능
  • Redis TTL 설정으로 하루 단위 count 초기화
  • 특정 시간대 패턴 분석 → AI 기반 위험 등급 상승