메시지 발행 방법
- KafkaTemplate 사용
- ProducerFactory 클래스를 사용해서 생성한다.
- 트랜잭션을 사용하지 않는 경우, Singleton으로 생성한다.
- flush()를 사용할 경우, 같은 Producer를 사용하는 다른 Thread에서 지연 현상이 발생할 수 있다.
* 2.3 이후 부터는 producerPerThread 속성이 추가되어, true로 설정할 경우, 각 Thread에서 별도의 생성자를 만들고 캐시 처리한다. -> flush() 사용에도 지연 현상이 발생하지 않는다.
- 2.5.10 이후에는 설정을 업데이트하거나, 제거할 수 있는 Method를 제공한다.
- SSL 키 변경 등에서 유용할게 사용할 수 있다.
- reset()을 사용하면, 기존 Producer는 닫고 새로운 설정으로 Producer를 생성한다.
- Transactional <-> Non-Transactional로의 변경은 불가능하다.
- 기본적으로 비동기적으로 처리한다.
- 동기적으로 처리할 수 있으나, kafka의 목적이 빠른 Stream 처리이므로 사용하지 않는 것이 좋다.
- 동기적으로 처리할 수 있으나, kafka의 목적이 빠른 Stream 처리이므로 사용하지 않는 것이 좋다.
- 발송 방법
- Message<?> 객체를 이용한다.
- 메시지에 헤더로 정보를 제공할 수 있다.
- ProducerRecode<K,V> 객체를 이용한다.
- Topic, Partition, Offset을 설정 후 전송한다.
- 2.5 이후 버전부터 KafkaSendCallback 인터페이스를 사용하면, 실패한 메시지까지 확인할 수 있다.
- ProducerFactory 클래스를 사용해서 생성한다.
- RoutingKafkaTemplate 사용
- 2.5 이후 버전부터 지원한다.
- 전송하는 Topic 별로 옵션을 다르게 설정할 수 있다.
- Topic 명을 Regular Expression(정규식)으로 표현할 수 있다.
- Topic 명을 Regular Expression(정규식)으로 표현할 수 있다.
- transactions, execute, flush, metric 커맨드를 지원하지 않는다.
- ReplyingKafkaTemplate 사용
- 2.1.3 버전부터 지원한다.
- Consumer가 특정 데이터를 전달 받았는지 여부를 확인할 수 있다.
- 3개의 Header가 기본으로 정의된다.
- KafkaHeaders.CORRELATION_ID: 요청과 응답을 연결시키는데 사용한다.
- KafkaHeaders.REPLY_TOPIC: 응답 토픽
- KafkaHeaders.REPLY_PARTITION: 응답 토픽의 파티션
- AggregatingReplyingKafkaTemplate 사용
- 여러 응답을 한 번에 처리할 수 있다.
환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10
KafkaTemplate 사용
@Component
public class AdvancedProducer {
@Value("${spring.kafka.topic.test}")
private String topic;
private final KafkaTemplate<String, String> kafkaTemplate;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public AdvancedProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void syncSend(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
try {
log.info("Success to send message.");
SendResult<String, String> result = future.get(5, TimeUnit.SECONDS);
log.info("success result {}", result.getProducerRecord());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
public void asyncSend(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new KafkaSendCallback<>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Success to send message.");
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Object, Object> failedProducerRecord = ex.getFailedProducerRecord();
log.error("Fail to send message.", failedProducerRecord);
}
});
}
}
- ListenableFuture의 get()을 사용하여, 메시지 발송 결과를 얻고 log를 찍을 때 까지 대기하는 동기적으로 사용할 수 있다.
- ListenableFuture의 addCallBack()을 사용하여, 비동기적으로 메시지 발송 결과를 처리할 수 있다.
RountingKafkaTemplate 사용
- KafkaTemplate과 유사하나, ProducerFactory가 2 개 이상 필요하다.
- ProducerFactory 1 개를 가지고 생성할 수 있으나, 이 경우에는 KafkaTemplate과 똑같이 동작한다.
@Configuration
public class RoutingKafkaTemplateConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public RoutingKafkaTemplate routingKafkaTemplate() {
return new RoutingKafkaTemplate(factories());
}
private Map<Pattern, ProducerFactory<Object, Object>> factories() {
Map<Pattern, ProducerFactory<Object, Object>> factories = new LinkedHashMap<>();
factories.put(Pattern.compile(".*"), defaultProducerFactory());
return factories;
}
private ProducerFactory<Object, Object> defaultProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerProps());
}
private Map<String, Object> producerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
- RoutingKafkaTemplate의 설정 방법이다.
- Pattern.compile을 사용하여 Topic명을 정규식으로 설정할 수 있다.
- Pattern.compile(".*")을 설정하여, 모든 Topic에 해당 설정을 적용할 수 있다.
@Component
public class AdvancedProducer {
@Value("${spring.kafka.topic.test}")
private String topic;
private final RoutingKafkaTemplate routingKafkaTemplate;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public AdvancedProducer(RoutingKafkaTemplate routingKafkaTemplate) {
this.routingKafkaTemplate = routingKafkaTemplate;
}
public void routingSend(String message) {
routingKafkaTemplate.send(topic, message);
}
}
@Component
public class AdvancedConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = "${spring.kafka.topic.test}")
public void listen(String message) {
log.info("[Consumer] message = {}", message);
}
}
- 다음과 같이 Producer, Consumer를 생성한다.
@SpringBootApplication
public class KafkaStudyApplication {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
@Bean
public ApplicationRunner runner(AdvancedProducer advancedProducer) {
return args -> {
advancedProducer.routingSend("test message.");
};
}
}
- ApplicationRunner에 의해 advancedProducer.routingSend를 호출하면, 다음과 같은 로그를 확인할 수 있다.
ReplyKafkaTemplate 사용
- 생성 시, Key와 Value 값 + 응답 타입까지 지정해야 한다.
- 생성 시, MessageListener도 같이 지정해야 한다.
- 내가 메시지를 발행하고, 발행한 메시지를 처리한 서비스의 응답 메시지를 확인할 수 있다.
@Configuration
public class ReplyKafkaTemplateConfiguration {
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(ProducerFactory producerFactory,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate(producerFactory, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("test-replies");
container.getContainerProperties().setGroupId("test-replies-container-id");
return container;
}
}
- ProducerFactory + MessageListenerContainer를 사용하여 ReplyingKafkaTemplate을 생성한다.
- MessageListenerContainer에 응답 값을 담을 Topic을 설정해야 한다.
@Component
public class AdvancedProducer {
private final ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public AdvancedProducer(ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate) {
this.replyingKafkaTemplate = replyingKafkaTemplate;
}
public void replyingSend(String topic, String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
try {
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
log.info("[ConsumerRecord] {}", consumerRecord.value());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
@Component
public class AdvancedConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@SendTo
@KafkaListener(id = "test-request-id", topics = "test-request")
public String listenRequest(String message) {
log.info("[Consumer] message = {}", message);
return "reply test";
}
}
- Producer, Consumer를 만든다.
- Consumer에 @SendTo 어노테이션을 붙여주어야 한다.
@SpringBootApplication
public class KafkaStudyApplication {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
@Bean
public ApplicationRunner runner(AdvancedProducer advancedProducer) {
return args -> {
advancedProducer.replyingSend("test-request", "test-message4.");
};
}
}
- ApplicationRunner에 의해 advancedProducer.replyingSend를 호출하면, 다음과 같은 로그를 확인할 수 있다.
'Kafka' 카테고리의 다른 글
[Spring + Kafka] AdminKafka 사용하기 (0) | 2022.12.01 |
---|---|
[Spring + Kafka] Consume Messages (0) | 2022.11.29 |
[Spring + Kafka] Topic 생성 및 조회, 삭제하기 (0) | 2022.11.27 |
[Spring + Kafka] 간단 실습 (0) | 2022.11.26 |
[Kafka] Exactly Once Semantics(EOS) (0) | 2022.10.17 |