본문 바로가기

Kafka

[Spring + Kafka] Consume Messages

Message Listener

Record MessageListener Auto Commit
  AcknowlegingMessageListener Manual Commit
  ConsumerAwareMessageListener Consumer 객체 활용
  AcknowledgingConsumerAwareMessageListener Manual Commit + Consumer 객체 활용
Batch BatchMessageListener Auto Commit
  BatchAcknowlegingMessageListener Manual Commit
  BatchConsumerAwareMessageListener Consumer 객체 활용
  BatchAcknowledgingConsumerAwareMessageListener Manual Commit + Consumer 객체 활용
  • Message Listener는 크게 2가지 타입으로 구분할 수 있다.
  • Record
    • 단일 메시지를 하나씩 처리한다.

  • Batch
    • Record를 List로 받아 여러 개의 메시지를  처리한다.

 

AckMode

Record 레코드를 처리한 후 리스너가 반환할 때 커밋한다.
Batch poll() 메서드(메세지를 가져올 때 사용하는 메서드)로 호출된 레코드가 모두 처리된 이후 커밋한다.
Spring Kafka Consumer의 AckMode 기본 값이다.
Time ackTime 만큼 지난 이후에 커밋한다.
시간 간격을 선언하는 ackTime 옵션을 설정해야 한다.
Count ackCount로 설정된 개수 만큼, 레코드가 처리된 이후 커밋한다.
레코드 개수를 선언하는 ackCount 옵션을 설정해야 한다.
Count_Time Count, Time 중 맞는 조건이 나오면 커밋한다.
Manual Acknowledgement.acknowledge() 메서드가 호출되며면 다음 번 poll() 메서드 호출 시 커밋한다.
매번, acknowledge()를 호출 하면, Batch 옵션과 동일한다.
AcknowledgingMessageListener, BatchAcknowledgingMessageListener를 사용해야 한다. 
Manual_Immediate Acknowledgement.acknowledge() 메서드가 호출되면 커밋한다.
AcknowledgingMessageListener, BatchAcknowledgingMessageListener를 사용해야 한다. 

 

Message Listener Container

  • Spring에서 2가지로 지원한다.
  • KafkaMessageListenerContainer
    • Single Thread

  • ConcurrentMessageListenerContainer
    • KafkaMessageListenerContainer 인스턴스를 1개 이상 사용하는 Multi-Thread
    • start, stop 등 메서드를 foreach로 순차적으로 실행

 

@KafkaListener

  • @EnableKafka를 사용하려면, @Configuration 중 Bean 이름이 kafkaListenerContainerFactory인 ConcurrentMeesageListenerContainer 객체가 필요하다.
  • Spring Boot에서는 모든 것이 기본 세팅되어 있다.
    • KafkaAutoConfiguration, ConcurrentKafkaListenerContianerFactoryConfigurer

  • 다양한 설정을 property로 설정할 수 있다.
    • clientIdPrefix, autoStartup, concurrency, topicPartitions...
    • concurreny: Thread 개수 지정
    • clientIdPrefix: consumer clientId prefix 지정
  • ConsumerRecordMetatdata를 이용해서 다양한 메타 데이터를 수신할 수 있다.

 

Payload Validator

  • 2.2 이후 버전부터 손쉽게 추가 가능하다.
  • KafkaListenerEndpointRegistrar에 등록해서 사용할 수 있다.
  • LocalValidatorFactoryBean을 사용하면, javax.validation(JSR-303)에 정의된 @NotEmpty, @Min, @Max, @Email과 같은 기본적인 유효성 체크도 사용할 수 있다.

 

Retrying Deliveries

  • 기본적으로 리스너에서 에러가 발생하면, Container Error Handler가 동작하게 된다.
  • RetryingMessageListenerAdapter를 사용해서 Retry 기능을 호출할 수 있다.
  • RetryTemplate, RecoveryCallback<Void>를 Container Factory에 설정하여 사용한다.
    • RecoveryCallback이 설정되지 않으면, 모든 재시도가 실패 시 Container Error가 발생한다.

 

Retry Stateful

  • BackOffPolicy를 이용해 재시도 하는 과정에서 Consumer Thread가 중지될 수 있다.
    • 재시도 하는동안에는 poll()이 수행되지 않는다.
    • session.timeout.ms: 설정된 시간 안에 heartbeat를 받지 못하면, Consumer Group에서 제거하고 rebalance가 발생한다.
    • max.poll.interval.ms: 설정된 시간 안에 poll()이 호출되지 않으면, Consumer가 죽었다고 판단해서 할당된 파티션이 revoke되고 rebalance가 발생한다.

환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10

 

MessageListenerContainer 생성

@Configuration
public class MessageListenerContainerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaMessageListenerContainer messageListenerContainer() {
        ContainerProperties containerProps = new ContainerProperties("test-event");
        containerProps.setGroupId("test-event-container");
        containerProps.setAckMode(ContainerProperties.AckMode.BATCH);
        containerProps.setMessageListener(new DefaultMessageListener());
        
        KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(containerFactory(), containerProps);
        container.setAutoStartup(false); // 자동으로 시작하는 것이 아닌, 내가 원할 때 실행
        
        return container;
    }

    private ConsumerFactory containerFactory() {
        return new DefaultKafkaConsumerFactory(props());
    }

    private Map<String, Object> props() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

public class DefaultMessageListener implements MessageListener<String, String> {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        log.info("[DefaultMessageListener] data = {}", data.value());
    }
}
  • containerProps.setGroupId()를 설정해야 Client Id가 발급될 수 있다.
  • containerProps.setAckMode()를 통해, AckMode를 설정할 수 있다.
  • containerProps.setMessageListener()에 내가 정의한 MessageListener를 등록할 수 있다.
  • container.setAutoStartup(false) 설정으로 AutoStart를 끄면, Application 구동 시점이 아닌 start() 메서드가 호출된 시점에서 container가 구동한다.

 

@Component
public class Advanced2Producer {

    @Value("${spring.kafka.topic.test}")
    private String topic;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public Advanced2Producer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void asyncSend(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new KafkaSendCallback<>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Success to send message.");
            }

            @Override
            public void onFailure(KafkaProducerException ex) {
                ProducerRecord<Object, Object> failedProducerRecord = ex.getFailedProducerRecord();
                log.error("Fail to send message.", failedProducerRecord);
            }
        });
    }
}

@Component
public class Advanced2Consumer {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(id = "test-event-id", topics = "${spring.kafka.topic.test}")
    public void listen(String message) {
        log.info("[Consumer] message = {}", message);
    }
}
  • 다음과 같이 Producer, Consumer를 작성한다.

@SpringBootApplication
public class KafkaStudyApplication {

	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(KafkaStudyApplication.class, args);
	}

	@Bean
	public ApplicationRunner runner(Advanced2Producer advanced2Producer,
									KafkaMessageListenerContainer kafkaMessageListenerContainer) {
		return args -> {
			kafkaMessageListenerContainer.start();

			advanced2Producer.asyncSend("test-message5.");
		};
	}
}
  • kafkaMessageListenerContainer.start()를 통해 Container를 구동할 수 있다.
  • ApplicationRunner에 의해 advanced2Producer.asyncSend를 호출하면, 다음과 같은 로그를 확인할 수 있다.

 

 

@KafkaListener 사용하기

@Component
public class Advanced2Consumer {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(id = "test-event-id", topics = "${spring.kafka.topic.test}", concurrency = "2", clientIdPrefix = "test-id")
    public void listen(String message,
                       ConsumerRecordMetadata metadata) {
        log.info("[Consumer] message = {}", message);
        log.info("[Consumer] offset = {}", metadata.offset());
    }
    
    @KafkaListener(id = "test-event-id", topics = "${spring.kafka.topic.test}", concurrency = "2", clientIdPrefix = "test-id")
    public void listen(String message,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
                       @Header(KafkaHeaders.REPLY_PARTITION) int partition,
                       @Header(KafkaHeaders.OFFSET) long offset) {
        log.info("[Consumer] message = {}", message);
        log.info("[Consumer] timestamp = {}", timestamp);
        log.info("[Consumer] partition = {}", partition);
        log.info("[Consumer] offset = {}", offset);
    }
}
  • 다양한 property를 사용할 수 있다.
  • ConsumerRecordMetadata metadata를 통해 다양한 정보를 확인할 수 있다.
  • @Header(KafkaHeaders.*)을 통해 정보를 가져올 수 있다.

 

KafkaJsonListenerContainer 사용하기

@Configuration
public class kafkaJsonConfiguration {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaAdmin.NewTopics newTopics() {
        return new KafkaAdmin.NewTopics(
                TopicBuilder.name("test-member").build()
        );
    }

    @Bean
    public KafkaTemplate<String, Member> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }

    private ProducerFactory<String, Member> producerFactory() {
        return new DefaultKafkaProducerFactory(producerProps());
    }

    private Map<String, Object> producerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
}

@Configuration
public class KafkaJsonListenerContainerConfiguration implements KafkaListenerConfigurer {

    private final LocalValidatorFactoryBean localValidatorFactoryBean;

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public KafkaJsonListenerContainerConfiguration(LocalValidatorFactoryBean localValidatorFactoryBean) {
        this.localValidatorFactoryBean = localValidatorFactoryBean;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Member>> kafkaJsonContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Member> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(memberFactory());

        return factory;
    }

    private ConsumerFactory<String, Member> memberFactory() {
        return new DefaultKafkaConsumerFactory(
                props(),
                new StringDeserializer(),
                new JsonDeserializer<>(Member.class));
    }

    private Map<String, Object> props() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setValidator(localValidatorFactoryBean);
    }
}
  • JsonSerializer, JsonDeserializer를 사용하는 KafkaTemplate,KafkaListenerContainerFactory를 생성해서 Bean으로 등록한다.
  • DefaultKafkaConsumerFactory를 생성할 때, new JsonDesrializer<>({Type Class}) Class를 지정하지 않으면, 신뢰할 수 없는 도메인 오류가 발생한다
  • Validation 적용을 위해 KafkaListnerConfugurer를 상속 받고, confiugurerKafkaListners 메서드를 오버라이드 한 뒤, LocalValidationFactoryBean을 Validator로 설정하여 Spring Validator가 적용되게 한다.

 

@Component
public class Advanced2Consumer {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(id = "test-member-id", topics = "test-member")
    public void listenMember(@Valid Member member) {
        log.info("[Consumer] member = {}", member.toString());
    }
}

@Component
public class Advanced2Producer {

    @Value("${spring.kafka.topic.test}")
    private String topic;
    private final KafkaTemplate<String, Member> kafkaMemberTemplate;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public Advanced2Producer(KafkaTemplate<String, Member> kafkaMemberTemplate) {
        this.kafkaMemberTemplate = kafkaMemberTemplate;
    }

    public void asyncMemberSend(Member member) {
        ListenableFuture<SendResult<String, Member>> future = kafkaMemberTemplate.send("test-member", "member-id", member);
        future.addCallback(new KafkaSendCallback<>() {
            @Override
            public void onSuccess(SendResult<String, Member> result) {
                log.info("Success to send message.");
            }

            @Override
            public void onFailure(KafkaProducerException ex) {
                ProducerRecord<Object, Object> failedProducerRecord = ex.getFailedProducerRecord();
                log.error("Fail to send message.", failedProducerRecord);
            }
        });
    }

}
  • Consumer, Producer를 작성한 후, Application을 실행하면 Cannot convert from [java.lang.String] to [Member.class] ... 오류가 발생한다.
  • 해당 오류는 위에서 KafkaListnerContainerFactory를 Bean 등록할 때, kafkaListenerContainerFactory로 등록하지 않아 발생하는 오류이다.
  • 해당 오류를 해결하기 위해서는 Consumser에서 @KafkaListener에 containerFactory를 지정해주는 방법이 있다.
@KafkaListener(id = "test-member-id", topics = "test-member", containerFactory = "kafkaJsonContainerFactory")

 

public class Member {
    @Email
    private String email;
    private int age;

    public Member() {
    }

    public Member(String email, int age) {
        this.email = email;
        this.age = age;
    }

    public String getEmail() {
        return email;
    }

    public int getAge() {
        return age;
    }

    public void setEmail(String email) {
        this.email = email;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Member{" +
                "email='" + email + '\'' +
                ", age=" + age +
                '}';
    }
}

@SpringBootApplication
public class KafkaStudyApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStudyApplication.class, args);
	}

	@Bean
	public ApplicationRunner runner(Advanced2Producer advanced2Producer) {
		return args -> {
			advanced2Producer.asyncMemberSend(new Member("test", 10));
		};
	}
}
  • ApplicationRunner에 의해 advanced2Producer.asyncMemberSend를 호출하면, 다음과 같은 로그를 확인할 수 있다.
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void springkafkastudy.kafkastudy.consumer.Advanced2Consumer.listenMember(springkafkastudy.kafkastudy.model.Member): 1 error(s): [Field error in object 'member' on field 'email': rejected value [test]; codes [Email.member.email,Email.email,Email.java.lang.String,Email]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.email,email]; arguments []; default message [email],[Ljavax.validation.constraints.Pattern$Flag;@1be64748,.*]; default message [올바른 형식의 이메일 주소여야 합니다]] 
  • email이 이메일 주소 형식이 아니기 때문에 발생하는 오류이다.
  • advanced2Producer.asyncMemberSend(new Member("test@test.com", 10));으로 수정하면 다음과 같은 로그를 확인할 수 있다.