기본 정책
- Spring-Kafka를 통해서 Consumer를 구현하고, Consumer에서 처리하는 과정에서 오류가 발생하면 기본 설정으로 최초 요청을 포함해 최대 10회까지 재시도를 한다.
- 모든 재시도 실패하게 되면 해당 메시지는 skip 처리 된다.
Spring-Kafka 에러 핸들링
- spring-kafka 2.8 이전 버전에서는 KafkaListenerContainerFactory에 RetryTemplate()을 설정하여 재시도 처리를 진행했으나, 2.8 이후 버전에서는 Deprecated 되었다.
- DefaultErrorHandler를 생성하여 에러를 처리할 수 있다.
실습
환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Member>> kafkaJsonContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Member> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(memberFactory());
factory.setCommonErrorHandler(customErrorHandler());
return factory;
}
private DefaultErrorHandler customErrorHandler() {
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
log.error("[Error] topic = {}, key = {}, value = {}, error message = {}", consumerRecord.topic(),
consumerRecord.key(), consumerRecord.value(), exception.getMessage());
}, new FixedBackOff(1000L, 10)); // 1초 간격으로 최대 10번
errorHandler.addNotRetryableExceptions(CustomException.class); // retry X
return errorHandler;
}
- KafkaListenerContainerFactory에 CommonErrorHandelr를 설정한다.
- DefaultErrorHandler를 생성한다.
- 첫 번째 인자는 에러 처리 함수, 두 번째 인자로 정책을 정의한다.
- .addNotRetryableExceptions()에 정의한 Exception은 retry 하지 않는다.
@Component
public class Advanced2Consumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@KafkaListener(id = "test-member-id", topics = "test-member", containerFactory = "kafkaJsonContainerFactory")
public void listenMember(@Valid Member member) {
log.info("[Consumer] member = {}", member.toString());
}
}
@SpringBootApplication
public class KafkaStudyApplication {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
@Bean
public ApplicationRunner runner(Advanced2Producer advanced2Producer) {
return args -> {
advanced2Producer.asyncMemberSend(new Member("retry", 10));
};
}
}
- Consumer에서 @Valid로 Member의 email 형식을 체크한다.
- Member의 이메일을 retry로 생성하고 메시지를 전송한다.
- Validation Error가 발생하고 ErrorHandler에 의해 다음과 같은 로그가 출력된다.
ErrorHandler로 단순히 로그를 남기도록 처리하는 것이 아닌 에러 발생 시 에러 값을 DB에 저장하는 등 다른 작업을 진행할 수 있다.
'Kafka' 카테고리의 다른 글
[Kafka] Apache Kafka Connect (0) | 2022.12.07 |
---|---|
[Spring + Kafka] Spring-Kafka-Streams 간단한 실습 (0) | 2022.12.05 |
[Spring + Kafka] 간단 성능 모니터링 (0) | 2022.12.02 |
[Spring + Kafka] AdminKafka 사용하기 (0) | 2022.12.01 |
[Spring + Kafka] Consume Messages (0) | 2022.11.29 |