본문 바로가기

Kafka

[Spring + Kafka] Publish Messages

메시지 발행 방법

  • KafkaTemplate 사용
    • ProducerFactory 클래스를 사용해서 생성한다.
      1. 트랜잭션을 사용하지 않는 경우, Singleton으로 생성한다.
      2. flush()를 사용할 경우, 같은 Producer를 사용하는 다른 Thread에서 지연 현상이 발생할 수 있다.
        * 2.3 이후 부터는 producerPerThread 속성이 추가되어, true로 설정할 경우, 각 Thread에서 별도의 생성자를 만들고 캐시 처리한다. -> flush() 사용에도 지연 현상이 발생하지 않는다.
    • 2.5.10 이후에는 설정을 업데이트하거나, 제거할 수 있는 Method를 제공한다.
      1. SSL 키 변경 등에서 유용할게 사용할 수 있다.
      2. reset()을 사용하면, 기존 Producer는 닫고 새로운 설정으로 Producer를 생성한다.
      3.  Transactional <-> Non-Transactional로의 변경은 불가능하다.
    • 기본적으로 비동기적으로 처리한다.
      1. 동기적으로 처리할 수 있으나, kafka의 목적이 빠른 Stream 처리이므로 사용하지 않는 것이 좋다.
    • 발송 방법
      1. Message<?> 객체를 이용한다.
      2. 메시지에 헤더로 정보를 제공할 수 있다.
      3. ProducerRecode<K,V> 객체를 이용한다.
      4. Topic, Partition, Offset을 설정 후 전송한다.
    • 2.5 이후 버전부터 KafkaSendCallback 인터페이스를 사용하면, 실패한 메시지까지 확인할 수 있다.

  • RoutingKafkaTemplate 사용
    • 2.5 이후 버전부터 지원한다.
    • 전송하는 Topic 별로 옵션을 다르게 설정할 수 있다.
      1. Topic 명을 Regular Expression(정규식)으로 표현할 수 있다.
    • transactions, execute, flush, metric 커맨드를 지원하지 않는다.

  • ReplyingKafkaTemplate 사용
    • 2.1.3 버전부터 지원한다.
    • Consumer가 특정 데이터를 전달 받았는지 여부를 확인할 수 있다.
    • 3개의 Header가 기본으로 정의된다.
      1. KafkaHeaders.CORRELATION_ID: 요청과 응답을 연결시키는데 사용한다.
      2. KafkaHeaders.REPLY_TOPIC: 응답 토픽
      3. KafkaHeaders.REPLY_PARTITION: 응답 토픽의 파티션

  • AggregatingReplyingKafkaTemplate 사용
    • 여러 응답을 한 번에 처리할 수 있다.

환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10

 

KafkaTemplate 사용

@Component
public class AdvancedProducer {

    @Value("${spring.kafka.topic.test}")
    private String topic;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public AdvancedProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void syncSend(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        try {
            log.info("Success to send message.");
            SendResult<String, String> result = future.get(5, TimeUnit.SECONDS);
            log.info("success result {}", result.getProducerRecord());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public void asyncSend(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new KafkaSendCallback<>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Success to send message.");
            }

            @Override
            public void onFailure(KafkaProducerException ex) {
                ProducerRecord<Object, Object> failedProducerRecord = ex.getFailedProducerRecord();
                log.error("Fail to send message.", failedProducerRecord);
            }
        });
    }
}
  • ListenableFuture의 get()을 사용하여, 메시지 발송 결과를 얻고 log를 찍을 때 까지 대기하는 동기적으로 사용할 수 있다.
  • ListenableFuture의 addCallBack()을 사용하여, 비동기적으로 메시지 발송 결과를 처리할 수 있다.

 

RountingKafkaTemplate 사용

  • KafkaTemplate과 유사하나, ProducerFactory가 2 개 이상 필요하다.
  • ProducerFactory 1 개를 가지고 생성할 수 있으나, 이 경우에는 KafkaTemplate과 똑같이 동작한다.
@Configuration
public class RoutingKafkaTemplateConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public RoutingKafkaTemplate routingKafkaTemplate() {
        return new RoutingKafkaTemplate(factories());
    }

    private Map<Pattern, ProducerFactory<Object, Object>> factories() {
        Map<Pattern, ProducerFactory<Object, Object>> factories = new LinkedHashMap<>();
        factories.put(Pattern.compile(".*"), defaultProducerFactory());
        return factories;

    }

    private ProducerFactory<Object, Object> defaultProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerProps());

    }

    private Map<String, Object> producerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

}
  • RoutingKafkaTemplate의 설정 방법이다.
  • Pattern.compile을 사용하여 Topic명을 정규식으로 설정할 수 있다.
  • Pattern.compile(".*")을 설정하여, 모든 Topic에 해당 설정을 적용할 수 있다.

@Component
public class AdvancedProducer {

    @Value("${spring.kafka.topic.test}")
    private String topic;
    private final RoutingKafkaTemplate routingKafkaTemplate;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public AdvancedProducer(RoutingKafkaTemplate routingKafkaTemplate) {
        this.routingKafkaTemplate = routingKafkaTemplate;
    }

    public void routingSend(String message) {
        routingKafkaTemplate.send(topic, message);
    }
}

@Component
public class AdvancedConsumer {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = "${spring.kafka.topic.test}")
    public void listen(String message) {
        log.info("[Consumer] message = {}", message);
    }
}
  • 다음과 같이 Producer, Consumer를 생성한다.

 

@SpringBootApplication
public class KafkaStudyApplication {

	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(KafkaStudyApplication.class, args);
	}

	@Bean
	public ApplicationRunner runner(AdvancedProducer advancedProducer) {
		return args -> {
			advancedProducer.routingSend("test message.");
		};
	}
}
  • ApplicationRunner에 의해 advancedProducer.routingSend를 호출하면, 다음과 같은 로그를 확인할 수 있다.

 

 

ReplyKafkaTemplate 사용

  • 생성 시, Key와 Value 값 + 응답 타입까지 지정해야 한다.
  • 생성 시, MessageListener도 같이 지정해야 한다.
  • 내가 메시지를 발행하고, 발행한 메시지를 처리한 서비스의 응답 메시지를 확인할 수 있다.
@Configuration
public class ReplyKafkaTemplateConfiguration {

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(ProducerFactory producerFactory,
                                                                               ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        return new ReplyingKafkaTemplate(producerFactory, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("test-replies");
        container.getContainerProperties().setGroupId("test-replies-container-id");
        return container;
    }

}
  • ProducerFactory + MessageListenerContainer를 사용하여 ReplyingKafkaTemplate을 생성한다.
  • MessageListenerContainer에 응답 값을 담을 Topic을 설정해야 한다.
@Component
public class AdvancedProducer {

    private final ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public AdvancedProducer(ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate) {
        this.replyingKafkaTemplate = replyingKafkaTemplate;
    }

    public void replyingSend(String topic, String message) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
        try {
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            log.info("[ConsumerRecord] {}", consumerRecord.value());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

@Component
public class AdvancedConsumer {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @SendTo
    @KafkaListener(id = "test-request-id", topics = "test-request")
    public String listenRequest(String message) {
        log.info("[Consumer] message = {}", message);
        return "reply test";
    }

}
  • Producer, Consumer를 만든다.
  • Consumer에 @SendTo 어노테이션을 붙여주어야 한다.

 

@SpringBootApplication
public class KafkaStudyApplication {

	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(KafkaStudyApplication.class, args);
	}

	@Bean
	public ApplicationRunner runner(AdvancedProducer advancedProducer) {
		return args -> {
			advancedProducer.replyingSend("test-request", "test-message4.");
		};
	}
}
  • ApplicationRunner에 의해 advancedProducer.replyingSend를 호출하면, 다음과 같은 로그를 확인할 수 있다.