상세 컨텐츠

본문 제목

[Kafka] Spring kafka의 KafkaConsumerFactory의 옵션 값을 알아보자.

IT/프로그래밍

by James Lee. 2020. 1. 13. 19:00

본문

Consumer Process 를 개발하는데 Spring Kafka 를 사용하였다. Consumer Config 를 생성 시 사용되는 옵션 중 하나인 KafkaConsumerFactory 에 대한 설정값을 알아본다.

 

Example

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

 

ENABLE_AUTO_COMMIT_CONFIG

true : Consumer 가 반환한 메시지의 오프셋을 Kafka에 주기적으로 커밋함. 커밋된 오프셋은 프로세스가 실패할 때 Consuming 이 시작될 위치로 사용됨

 

AUTO_OFFSET_RESET_CONFIG

Kafka에 초기 오프셋이 없거나 오프셋이 범위를 벗어난 경우 수행할 작업을 지정한다.

  • smallest: 가장 작은 오프셋으로 자동 재설정
  • largest: 가장 큰 오프셋으로 자동 재설정
  • disable: Consumer group 에 대한 이전 offset이 발견되지 않으면 Consumer에게 예외를 발생시킨다.
  • anything else: Consumer에게 예외를 발생시킨다.

 

참고 문서

 

※ 본 문서는 지속적으로 업데이트될 예정입니다. 틀린 부분이 있다면 댓글로 피드백 부탁드립니다.

관련글 더보기

댓글 영역