Consumer Process 를 개발하는데 Spring Kafka
를 사용하였다. Consumer Config 를 생성 시 사용되는 옵션 중 하나인 KafkaConsumerFactory
에 대한 설정값을 알아본다.
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
true
: Consumer 가 반환한 메시지의 오프셋을 Kafka에 주기적으로 커밋함. 커밋된 오프셋은 프로세스가 실패할 때 Consuming 이 시작될 위치로 사용됨
Kafka에 초기 오프셋이 없거나 오프셋이 범위를 벗어난 경우 수행할 작업을 지정한다.
smallest
: 가장 작은 오프셋으로 자동 재설정largest
: 가장 큰 오프셋으로 자동 재설정disable
: Consumer group 에 대한 이전 offset이 발견되지 않으면 Consumer에게 예외를 발생시킨다.anything else
: Consumer에게 예외를 발생시킨다.
※ 본 문서는 지속적으로 업데이트될 예정입니다. 틀린 부분이 있다면 댓글로 피드백 부탁드립니다.
[algorithm] 순열(Permutation) 정리 (0) | 2020.01.26 |
---|---|
[Git] git ssh clone시 'The authenticity of host ~ can't be established 오류 해결하는 방법 (1) | 2020.01.17 |
[Architecture] 이벤트 버스 패턴(Event bus pattern) 정리 (0) | 2020.01.13 |
[OpenShift] 오픈시프트 간단한 소개 / 장점 / 아키텍처 정리 (0) | 2020.01.10 |
[Hive] is not , <> , ! = 조건식의 차이점을 알아보자. (0) | 2020.01.03 |
댓글 영역