본문 바로가기

Kafka

[Spring + Kafka] 에러 핸들링

기본 정책

  • Spring-Kafka를 통해서 Consumer를 구현하고, Consumer에서 처리하는 과정에서 오류가 발생하면 기본 설정으로 최초 요청을 포함해 최대 10회까지 재시도를 한다.
  • 모든 재시도 실패하게 되면 해당 메시지는 skip 처리 된다.

 

Spring-Kafka 에러 핸들링

  • spring-kafka 2.8 이전 버전에서는 KafkaListenerContainerFactory에 RetryTemplate()을 설정하여 재시도 처리를 진행했으나, 2.8 이후 버전에서는 Deprecated 되었다.
  • DefaultErrorHandler를 생성하여 에러를 처리할 수 있다.

 

실습

환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Member>> kafkaJsonContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Member> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(memberFactory());
    factory.setCommonErrorHandler(customErrorHandler());
    return factory;
}

private DefaultErrorHandler customErrorHandler()  {
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
        log.error("[Error] topic = {}, key = {}, value = {}, error message = {}", consumerRecord.topic(), 
                    consumerRecord.key(), consumerRecord.value(), exception.getMessage());
    }, new FixedBackOff(1000L, 10)); // 1초 간격으로 최대 10번
    errorHandler.addNotRetryableExceptions(CustomException.class); // retry X

    return errorHandler;
}
  • KafkaListenerContainerFactory에 CommonErrorHandelr를 설정한다.
  • DefaultErrorHandler를 생성한다.
  • 첫 번째 인자는 에러 처리 함수, 두 번째 인자로 정책을 정의한다.
  • .addNotRetryableExceptions()에 정의한 Exception은 retry 하지 않는다.

 

@Component
public class Advanced2Consumer {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(id = "test-member-id", topics = "test-member", containerFactory = "kafkaJsonContainerFactory")
    public void listenMember(@Valid Member member) {
        log.info("[Consumer] member = {}", member.toString());
    }
}

@SpringBootApplication
public class KafkaStudyApplication {

	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(KafkaStudyApplication.class, args);
	}


	@Bean
	public ApplicationRunner runner(Advanced2Producer advanced2Producer) {
		return args -> {
			advanced2Producer.asyncMemberSend(new Member("retry", 10));
		};
	}
}
  • Consumer에서 @Valid로 Member의 email 형식을 체크한다.
  • Member의 이메일을 retry로 생성하고 메시지를 전송한다.
  • Validation Error가 발생하고 ErrorHandler에 의해 다음과 같은 로그가 출력된다.


ErrorHandler로 단순히 로그를 남기도록 처리하는 것이 아닌 에러 발생 시 에러 값을 DB에 저장하는 등 다른 작업을 진행할 수 있다.