Topic 생성 시 고려해야 할 점
- Topic 명
- Topic 명은 한 번 정하면, 바꾸기가 매우 어렵기 때문에 Rule을 적용한 패턴을 통해 의미를 부여한다.
- Topic 명은 한 번 정하면, 바꾸기가 매우 어렵기 때문에 Rule을 적용한 패턴을 통해 의미를 부여한다.
- Topic의 파티션 개수 계산
- 파티션 개수: 1초 당 메시지 발행 수 / Consumer Thread 1개가 1초당 처리하는 메시지 수
- 파티션 개수를 늘릴 수는 있지만 줄일 수는 없다.
- Retention 시간
- kafka borker가 메시지를 어느 시간까지 보관할지 시간이나 byte에 따라서 초과 시, 데이터를 삭제한다.
- 디스크 크기와 데이터의 중요성에 따라 판단한다.
환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10
Application.yml 설정
spring:
kafka:
bootstrap-servers: {host}:{port}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test # consumerGroupId
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
spring.json.trusted.packages: "*"
topic:
test: test-event
Kafka Topic 생성
@Configuration
public class KafkaTopicConfiguration {
@Bean
public NewTopic newTopic() {
return TopicBuilder.name("test-event2").build();
}
}
- AdminClient + TopicBuilder를 사용한 Topic 생성
- Application 실행 시, 다음과 같이 "test-event2" Topic이 생성된 것을 확인할 수 있다.
@Configuration
public class KafkaTopicConfiguration {
@Bean
public KafkaAdmin.NewTopics newTopics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("test-event2-1").build(),
TopicBuilder.name("test-event2-2")
.partitions(3)
.replicas(1)
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(1000 * 60 * 60))
.build()
);
}
}
- NewTopics를 사용한 다중 Topic 생성
- TopicBuilder를 사용하면, replica 개수, partition 개수, config 등 다양한 설정을 할 수 있다.
- Application 실행 시, 다음과 같이 "test-event2-1", "test-event2-2" Topic이 생성된 것을 확인할 수 있다.
* 위와 같은 방법을 사용해서 Topic 생성 시, Kafka 서버에 존재하지 않는 Topic일 경우에만 Topic을 생성한다.
Kafka Topic 정보 조회 및 삭제
정보 조회
@Configuration
public class KafkaConfiguration {
@Bean
public AdminClient adminClient(KafkaAdmin kafkaAdmin) {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
}
@SpringBootApplication
public class KafkaStudyApplication {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
@Bean
public ApplicationRunner runner(AdminClient adminClient) {
return args -> {
Map<String, TopicListing> topics = adminClient.listTopics().namesToListings().get();
for (String topicName : topics.keySet()) {
log.info("topicName = {}, topicListing = {}", topicName, topics.get(topicName));
Map<String, TopicDescription> description = adminClient.describeTopics(Collections.singleton(topicName)).allTopicNames().get();
log.info("topicName = {}, description = {}", topicName, description.get(topicName));
}
};
}
}
- adminClient를 사용하여 Kafka Broker에 생성되어 있는 Topic의 정보를 확인할 수 있다.
- Application 실행 시, 다음과 같은 로그를 확인할 수 있다.
topicName = test-event2, description = (name=test-event2, internal=false, partitions=(partition=0, leader={host}:9092 (id: 0 rack: null), replicas={host}:9092 (id: 0 rack: null), isr={host}:9092 (id: 0 rack: null)), authorizedOperations=null)
topicName = test-event2-1, topicListing = (name=test-event2-1, topicId=fZlGrdFPSdKux-2BYXyNLQ, internal=false)
...
Topic 삭제
@SpringBootApplication
public class KafkaStudyApplication {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
@Bean
public ApplicationRunner runner(AdminClient adminClient) {
return args -> {
Map<String, TopicListing> topics = adminClient.listTopics().namesToListings().get();
for (String topicName : topics.keySet()) {
if (!topics.get(topicName).isInternal()) {
adminClient.deleteTopics(Collections.singleton(topicName));
}
}
};
}
}
- adminClient.deleteTopics()를 사용해서 Topic을 삭제할 수 있다.
- internal이 아닌 Topic만 제거하는 것이 안전하다.
- 다음과 같이 Topic이 제거된 것을 확인할 수 있다.
'Kafka' 카테고리의 다른 글
[Spring + Kafka] Consume Messages (0) | 2022.11.29 |
---|---|
[Spring + Kafka] Publish Messages (0) | 2022.11.28 |
[Spring + Kafka] 간단 실습 (0) | 2022.11.26 |
[Kafka] Exactly Once Semantics(EOS) (0) | 2022.10.17 |
[Kafka] Kafka Log File (0) | 2022.10.15 |