본문 바로가기

Kafka

[Spring + Kafka] Topic 생성 및 조회, 삭제하기

Topic 생성 시 고려해야 할 점

  • Topic 명
    • Topic 명은 한 번 정하면, 바꾸기가 매우 어렵기 때문에 Rule을 적용한 패턴을 통해 의미를 부여한다.

  • Topic의 파티션 개수 계산
    • 파티션 개수: 1초 당 메시지 발행 수 / Consumer Thread 1개가 1초당 처리하는 메시지 수
    • 파티션 개수를 늘릴 수는 있지만 줄일 수는 없다.

  • Retention 시간
    • kafka borker가 메시지를 어느 시간까지 보관할지 시간이나 byte에 따라서 초과 시, 데이터를 삭제한다.
    • 디스크 크기와 데이터의 중요성에 따라 판단한다.

환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10

 

Application.yml 설정

spring:
  kafka:
    bootstrap-servers: {host}:{port}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test # consumerGroupId
      auto-offset-reset: latest 
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        spring.json.trusted.packages: "*"
    topic:
      test: test-event

 

Kafka Topic 생성

@Configuration
public class KafkaTopicConfiguration {

    @Bean
    public NewTopic newTopic() {
        return TopicBuilder.name("test-event2").build();
    }
}
  • AdminClient + TopicBuilder를 사용한 Topic 생성
  • Application 실행 시, 다음과 같이 "test-event2" Topic이 생성된 것을 확인할 수 있다.

 

@Configuration
public class KafkaTopicConfiguration {

    @Bean
    public KafkaAdmin.NewTopics newTopics() {
        return new KafkaAdmin.NewTopics(
                TopicBuilder.name("test-event2-1").build(),
                TopicBuilder.name("test-event2-2")
                        .partitions(3)
                        .replicas(1)
                        .config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(1000 * 60 * 60))
                        .build()
        );
    }
}

 

  • NewTopics를 사용한 다중 Topic 생성
  • TopicBuilder를 사용하면, replica 개수, partition 개수, config 등 다양한 설정을 할 수 있다.
  • Application 실행 시, 다음과 같이 "test-event2-1", "test-event2-2" Topic이 생성된 것을 확인할  수 있다.

 

* 위와 같은 방법을 사용해서 Topic 생성 시, Kafka 서버에 존재하지 않는 Topic일 경우에만 Topic을 생성한다.

 

Kafka Topic 정보 조회 및 삭제

정보 조회

@Configuration
public class KafkaConfiguration {

    @Bean
    public AdminClient adminClient(KafkaAdmin kafkaAdmin) {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }
}
@SpringBootApplication
public class KafkaStudyApplication {

	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(KafkaStudyApplication.class, args);
	}

	@Bean
	public ApplicationRunner runner(AdminClient adminClient) {
		return args -> {
			Map<String, TopicListing> topics = adminClient.listTopics().namesToListings().get();
			for (String topicName : topics.keySet()) {
				log.info("topicName = {},     topicListing = {}", topicName, topics.get(topicName));

				Map<String, TopicDescription> description = adminClient.describeTopics(Collections.singleton(topicName)).allTopicNames().get();
				log.info("topicName = {},     description = {}", topicName, description.get(topicName));
			}
		};
	}

}
  • adminClient를 사용하여 Kafka Broker에 생성되어 있는 Topic의 정보를 확인할 수 있다.
  • Application 실행 시, 다음과 같은 로그를 확인할 수 있다.
topicName = test-event2, description = (name=test-event2, internal=false, partitions=(partition=0, leader={host}:9092 (id: 0 rack: null), replicas={host}:9092 (id: 0 rack: null), isr={host}:9092 (id: 0 rack: null)), authorizedOperations=null)
topicName = test-event2-1, topicListing = (name=test-event2-1, topicId=fZlGrdFPSdKux-2BYXyNLQ, internal=false)
...

 

Topic 삭제

@SpringBootApplication
public class KafkaStudyApplication {

	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(KafkaStudyApplication.class, args);
	}

	@Bean
	public ApplicationRunner runner(AdminClient adminClient) {
		return args -> {
			Map<String, TopicListing> topics = adminClient.listTopics().namesToListings().get();
			for (String topicName : topics.keySet()) {
				if (!topics.get(topicName).isInternal()) {
					adminClient.deleteTopics(Collections.singleton(topicName));
				}
			}
		};
	}

}
  • adminClient.deleteTopics()를 사용해서 Topic을 삭제할 수 있다.
  • internal이 아닌 Topic만 제거하는 것이 안전하다.
  • 다음과 같이 Topic이 제거된 것을 확인할 수 있다.




'Kafka' 카테고리의 다른 글

[Spring + Kafka] Consume Messages  (0) 2022.11.29
[Spring + Kafka] Publish Messages  (0) 2022.11.28
[Spring + Kafka] 간단 실습  (0) 2022.11.26
[Kafka] Exactly Once Semantics(EOS)  (0) 2022.10.17
[Kafka] Kafka Log File  (0) 2022.10.15