Message Listener
Record | MessageListener | Auto Commit |
AcknowlegingMessageListener | Manual Commit | |
ConsumerAwareMessageListener | Consumer 객체 활용 | |
AcknowledgingConsumerAwareMessageListener | Manual Commit + Consumer 객체 활용 | |
Batch | BatchMessageListener | Auto Commit |
BatchAcknowlegingMessageListener | Manual Commit | |
BatchConsumerAwareMessageListener | Consumer 객체 활용 | |
BatchAcknowledgingConsumerAwareMessageListener | Manual Commit + Consumer 객체 활용 |
- Message Listener는 크게 2가지 타입으로 구분할 수 있다.
- Record
- 단일 메시지를 하나씩 처리한다.
- 단일 메시지를 하나씩 처리한다.
- Batch
- Record를 List로 받아 여러 개의 메시지를 처리한다.
AckMode
Record | 레코드를 처리한 후 리스너가 반환할 때 커밋한다. |
Batch | poll() 메서드(메세지를 가져올 때 사용하는 메서드)로 호출된 레코드가 모두 처리된 이후 커밋한다. Spring Kafka Consumer의 AckMode 기본 값이다. |
Time | ackTime 만큼 지난 이후에 커밋한다. 시간 간격을 선언하는 ackTime 옵션을 설정해야 한다. |
Count | ackCount로 설정된 개수 만큼, 레코드가 처리된 이후 커밋한다. 레코드 개수를 선언하는 ackCount 옵션을 설정해야 한다. |
Count_Time | Count, Time 중 맞는 조건이 나오면 커밋한다. |
Manual | Acknowledgement.acknowledge() 메서드가 호출되며면 다음 번 poll() 메서드 호출 시 커밋한다. 매번, acknowledge()를 호출 하면, Batch 옵션과 동일한다. AcknowledgingMessageListener, BatchAcknowledgingMessageListener를 사용해야 한다. |
Manual_Immediate | Acknowledgement.acknowledge() 메서드가 호출되면 커밋한다. AcknowledgingMessageListener, BatchAcknowledgingMessageListener를 사용해야 한다. |
Message Listener Container
- Spring에서 2가지로 지원한다.
- KafkaMessageListenerContainer
- Single Thread
- Single Thread
- ConcurrentMessageListenerContainer
- KafkaMessageListenerContainer 인스턴스를 1개 이상 사용하는 Multi-Thread
- start, stop 등 메서드를 foreach로 순차적으로 실행
@KafkaListener
- @EnableKafka를 사용하려면, @Configuration 중 Bean 이름이 kafkaListenerContainerFactory인 ConcurrentMeesageListenerContainer 객체가 필요하다.
- Spring Boot에서는 모든 것이 기본 세팅되어 있다.
- KafkaAutoConfiguration, ConcurrentKafkaListenerContianerFactoryConfigurer
- KafkaAutoConfiguration, ConcurrentKafkaListenerContianerFactoryConfigurer
- 다양한 설정을 property로 설정할 수 있다.
- clientIdPrefix, autoStartup, concurrency, topicPartitions...
- concurreny: Thread 개수 지정
- clientIdPrefix: consumer clientId prefix 지정
- ConsumerRecordMetatdata를 이용해서 다양한 메타 데이터를 수신할 수 있다.
Payload Validator
- 2.2 이후 버전부터 손쉽게 추가 가능하다.
- KafkaListenerEndpointRegistrar에 등록해서 사용할 수 있다.
- LocalValidatorFactoryBean을 사용하면, javax.validation(JSR-303)에 정의된 @NotEmpty, @Min, @Max, @Email과 같은 기본적인 유효성 체크도 사용할 수 있다.
Retrying Deliveries
- 기본적으로 리스너에서 에러가 발생하면, Container Error Handler가 동작하게 된다.
- RetryingMessageListenerAdapter를 사용해서 Retry 기능을 호출할 수 있다.
- RetryTemplate, RecoveryCallback<Void>를 Container Factory에 설정하여 사용한다.
- RecoveryCallback이 설정되지 않으면, 모든 재시도가 실패 시 Container Error가 발생한다.
Retry Stateful
- BackOffPolicy를 이용해 재시도 하는 과정에서 Consumer Thread가 중지될 수 있다.
- 재시도 하는동안에는 poll()이 수행되지 않는다.
- session.timeout.ms: 설정된 시간 안에 heartbeat를 받지 못하면, Consumer Group에서 제거하고 rebalance가 발생한다.
- max.poll.interval.ms: 설정된 시간 안에 poll()이 호출되지 않으면, Consumer가 죽었다고 판단해서 할당된 파티션이 revoke되고 rebalance가 발생한다.
환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10
MessageListenerContainer 생성
@Configuration
public class MessageListenerContainerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaMessageListenerContainer messageListenerContainer() {
ContainerProperties containerProps = new ContainerProperties("test-event");
containerProps.setGroupId("test-event-container");
containerProps.setAckMode(ContainerProperties.AckMode.BATCH);
containerProps.setMessageListener(new DefaultMessageListener());
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(containerFactory(), containerProps);
container.setAutoStartup(false); // 자동으로 시작하는 것이 아닌, 내가 원할 때 실행
return container;
}
private ConsumerFactory containerFactory() {
return new DefaultKafkaConsumerFactory(props());
}
private Map<String, Object> props() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
public class DefaultMessageListener implements MessageListener<String, String> {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(ConsumerRecord<String, String> data) {
log.info("[DefaultMessageListener] data = {}", data.value());
}
}
- containerProps.setGroupId()를 설정해야 Client Id가 발급될 수 있다.
- containerProps.setAckMode()를 통해, AckMode를 설정할 수 있다.
- containerProps.setMessageListener()에 내가 정의한 MessageListener를 등록할 수 있다.
- container.setAutoStartup(false) 설정으로 AutoStart를 끄면, Application 구동 시점이 아닌 start() 메서드가 호출된 시점에서 container가 구동한다.
@Component
public class Advanced2Producer {
@Value("${spring.kafka.topic.test}")
private String topic;
private final KafkaTemplate<String, String> kafkaTemplate;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public Advanced2Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void asyncSend(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new KafkaSendCallback<>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Success to send message.");
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Object, Object> failedProducerRecord = ex.getFailedProducerRecord();
log.error("Fail to send message.", failedProducerRecord);
}
});
}
}
@Component
public class Advanced2Consumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@KafkaListener(id = "test-event-id", topics = "${spring.kafka.topic.test}")
public void listen(String message) {
log.info("[Consumer] message = {}", message);
}
}
- 다음과 같이 Producer, Consumer를 작성한다.
@SpringBootApplication
public class KafkaStudyApplication {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
@Bean
public ApplicationRunner runner(Advanced2Producer advanced2Producer,
KafkaMessageListenerContainer kafkaMessageListenerContainer) {
return args -> {
kafkaMessageListenerContainer.start();
advanced2Producer.asyncSend("test-message5.");
};
}
}
- kafkaMessageListenerContainer.start()를 통해 Container를 구동할 수 있다.
- ApplicationRunner에 의해 advanced2Producer.asyncSend를 호출하면, 다음과 같은 로그를 확인할 수 있다.
@KafkaListener 사용하기
@Component
public class Advanced2Consumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@KafkaListener(id = "test-event-id", topics = "${spring.kafka.topic.test}", concurrency = "2", clientIdPrefix = "test-id")
public void listen(String message,
ConsumerRecordMetadata metadata) {
log.info("[Consumer] message = {}", message);
log.info("[Consumer] offset = {}", metadata.offset());
}
@KafkaListener(id = "test-event-id", topics = "${spring.kafka.topic.test}", concurrency = "2", clientIdPrefix = "test-id")
public void listen(String message,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
@Header(KafkaHeaders.REPLY_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("[Consumer] message = {}", message);
log.info("[Consumer] timestamp = {}", timestamp);
log.info("[Consumer] partition = {}", partition);
log.info("[Consumer] offset = {}", offset);
}
}
- 다양한 property를 사용할 수 있다.
- ConsumerRecordMetadata metadata를 통해 다양한 정보를 확인할 수 있다.
- @Header(KafkaHeaders.*)을 통해 정보를 가져올 수 있다.
KafkaJsonListenerContainer 사용하기
@Configuration
public class kafkaJsonConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaAdmin.NewTopics newTopics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("test-member").build()
);
}
@Bean
public KafkaTemplate<String, Member> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
private ProducerFactory<String, Member> producerFactory() {
return new DefaultKafkaProducerFactory(producerProps());
}
private Map<String, Object> producerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}
@Configuration
public class KafkaJsonListenerContainerConfiguration implements KafkaListenerConfigurer {
private final LocalValidatorFactoryBean localValidatorFactoryBean;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public KafkaJsonListenerContainerConfiguration(LocalValidatorFactoryBean localValidatorFactoryBean) {
this.localValidatorFactoryBean = localValidatorFactoryBean;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Member>> kafkaJsonContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Member> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(memberFactory());
return factory;
}
private ConsumerFactory<String, Member> memberFactory() {
return new DefaultKafkaConsumerFactory(
props(),
new StringDeserializer(),
new JsonDeserializer<>(Member.class));
}
private Map<String, Object> props() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(localValidatorFactoryBean);
}
}
- JsonSerializer, JsonDeserializer를 사용하는 KafkaTemplate,KafkaListenerContainerFactory를 생성해서 Bean으로 등록한다.
- DefaultKafkaConsumerFactory를 생성할 때, new JsonDesrializer<>({Type Class}) Class를 지정하지 않으면, 신뢰할 수 없는 도메인 오류가 발생한다
- Validation 적용을 위해 KafkaListnerConfugurer를 상속 받고, confiugurerKafkaListners 메서드를 오버라이드 한 뒤, LocalValidationFactoryBean을 Validator로 설정하여 Spring Validator가 적용되게 한다.
@Component
public class Advanced2Consumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@KafkaListener(id = "test-member-id", topics = "test-member")
public void listenMember(@Valid Member member) {
log.info("[Consumer] member = {}", member.toString());
}
}
@Component
public class Advanced2Producer {
@Value("${spring.kafka.topic.test}")
private String topic;
private final KafkaTemplate<String, Member> kafkaMemberTemplate;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public Advanced2Producer(KafkaTemplate<String, Member> kafkaMemberTemplate) {
this.kafkaMemberTemplate = kafkaMemberTemplate;
}
public void asyncMemberSend(Member member) {
ListenableFuture<SendResult<String, Member>> future = kafkaMemberTemplate.send("test-member", "member-id", member);
future.addCallback(new KafkaSendCallback<>() {
@Override
public void onSuccess(SendResult<String, Member> result) {
log.info("Success to send message.");
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Object, Object> failedProducerRecord = ex.getFailedProducerRecord();
log.error("Fail to send message.", failedProducerRecord);
}
});
}
}
- Consumer, Producer를 작성한 후, Application을 실행하면 Cannot convert from [java.lang.String] to [Member.class] ... 오류가 발생한다.
- 해당 오류는 위에서 KafkaListnerContainerFactory를 Bean 등록할 때, kafkaListenerContainerFactory로 등록하지 않아 발생하는 오류이다.
- 해당 오류를 해결하기 위해서는 Consumser에서 @KafkaListener에 containerFactory를 지정해주는 방법이 있다.
@KafkaListener(id = "test-member-id", topics = "test-member", containerFactory = "kafkaJsonContainerFactory")
public class Member {
@Email
private String email;
private int age;
public Member() {
}
public Member(String email, int age) {
this.email = email;
this.age = age;
}
public String getEmail() {
return email;
}
public int getAge() {
return age;
}
public void setEmail(String email) {
this.email = email;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Member{" +
"email='" + email + '\'' +
", age=" + age +
'}';
}
}
@SpringBootApplication
public class KafkaStudyApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
@Bean
public ApplicationRunner runner(Advanced2Producer advanced2Producer) {
return args -> {
advanced2Producer.asyncMemberSend(new Member("test", 10));
};
}
}
- ApplicationRunner에 의해 advanced2Producer.asyncMemberSend를 호출하면, 다음과 같은 로그를 확인할 수 있다.
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void springkafkastudy.kafkastudy.consumer.Advanced2Consumer.listenMember(springkafkastudy.kafkastudy.model.Member): 1 error(s): [Field error in object 'member' on field 'email': rejected value [test]; codes [Email.member.email,Email.email,Email.java.lang.String,Email]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.email,email]; arguments []; default message [email],[Ljavax.validation.constraints.Pattern$Flag;@1be64748,.*]; default message [올바른 형식의 이메일 주소여야 합니다]]
- email이 이메일 주소 형식이 아니기 때문에 발생하는 오류이다.
- advanced2Producer.asyncMemberSend(new Member("test@test.com", 10));으로 수정하면 다음과 같은 로그를 확인할 수 있다.
'Kafka' 카테고리의 다른 글
[Spring + Kafka] 간단 성능 모니터링 (0) | 2022.12.02 |
---|---|
[Spring + Kafka] AdminKafka 사용하기 (0) | 2022.12.01 |
[Spring + Kafka] Publish Messages (0) | 2022.11.28 |
[Spring + Kafka] Topic 생성 및 조회, 삭제하기 (0) | 2022.11.27 |
[Spring + Kafka] 간단 실습 (0) | 2022.11.26 |