Spring이 제공하는 Adminkafka를 사용하여 Topic, 설정 정보 등의 다양한 Kafka 정보를 조회 및 변경, 삭제할 수 있다.
환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10
Topic 정보 확인하기
@Service
public class KafkaManager {
private final AdminClient adminClient;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public KafkaManager(KafkaAdmin kafkaAdmin) {
this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
public void describeTopicConfigs() throws ExecutionException, InterruptedException {
Collection<ConfigResource> resources = List.of(
new ConfigResource(ConfigResource.Type.TOPIC, "test-event")
);
DescribeConfigsResult result = adminClient.describeConfigs(resources);
log.info("result = {}", result.all().get());
}
}
- .describeConfigs()를 사용해서 설정 정보를 확인할 수 있다.
- ConfigResource의 타입을 Topic으로 설정하고 지정한 Topic의 설정 정보를 확인할 수 있다.
- Spring에서 지원하는 ConfigResource.Type의 종류는 다음과 같다.
- BROKER_LOGGER
- BROKER
- TOPIC
- UNKOWN
설정 정보 변경하기
@Service
public class KafkaManager {
private final AdminClient adminClient;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public KafkaManager(KafkaAdmin kafkaAdmin) {
this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
public void describeTopicConfigs() throws ExecutionException, InterruptedException {
Collection<ConfigResource> resources = List.of(
new ConfigResource(ConfigResource.Type.TOPIC, "test-event")
);
DescribeConfigsResult result = adminClient.describeConfigs(resources);
log.info("result = {}", result.all().get());
}
public void changeConfig() throws ExecutionException, InterruptedException {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test-event");
Map<ConfigResource, Collection<AlterConfigOp>> ops = new HashMap<>();
ops.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "6000"), AlterConfigOp.OpType.SET)));
adminClient.incrementalAlterConfigs(ops);
describeTopicConfigs();
}
}
- .incrementalAlterConfigs()를 사용하여 설정 정보를 변경할 수 있다.
- AlterConfigOp.OpType.*을 지정하여 설정 정보를 조작할 수 있다.
- Spring에서 제공하는 AlterConfigOp.OpType의 종류는 다음과 같다.
- SET
- DELETE
- APPEND
- SUBTRACT
- 위 코드를 실행하면, test-event Topic의 retention_ms 정보가 6000으로 변경된 것을 확인할 수 있다.
Record 삭제
public class KafkaManager {
private final AdminClient adminClient;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public KafkaManager(KafkaAdmin kafkaAdmin) {
this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
public void deleteRecords() throws ExecutionException, InterruptedException {
TopicPartition topicPartition = new TopicPartition("test-event", 0);
Map<TopicPartition, RecordsToDelete> target = new HashMap<>();
target.put(topicPartition, RecordsToDelete.beforeOffset(1));
DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(target);
Map<TopicPartition, KafkaFuture<DeletedRecords>> result = deleteRecordsResult.lowWatermarks();
for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> futureEntry : result.entrySet()) {
log.info("[deleteRecords] futureEntry key = {}, value = {}", futureEntry.getKey().topic(), futureEntry.getValue().get());
}
}
}
- .deleteRecords()를 사용해서 TopicPartition의 Record를 지울 수 있다.
- Topic의 파티션을 지정하고, 지정한 Offset 이전 모든 Record를 삭제할 수 있다.
ConsumerGroup 조회하기
@Service
public class KafkaManager {
private final AdminClient adminClient;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public KafkaManager(KafkaAdmin kafkaAdmin) {
this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
public void findAllConsumerGroups() throws ExecutionException, InterruptedException {
ListConsumerGroupsResult result = adminClient.listConsumerGroups();
Collection<ConsumerGroupListing> groups = result.valid().get();
for (ConsumerGroupListing group : groups) {
log.info("[findAllConsumerGroups] group = {}", group);
}
}
}
- .listConsumerGroups()로 ConsumerGroup을 조회할 수 있다.
ConsumerGroup 삭제하기
@Service
public class KafkaManager {
private final AdminClient adminClient;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public KafkaManager(KafkaAdmin kafkaAdmin) {
this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
public void deleteConsumerGroup() {
adminClient.deleteConsumerGroups(List.of("test"));
}
}
- state=Optional[Empty])인 ConsumerGroup만 제거할 수 있다.
- state=Optional([Empty])가 아닌 ConsumerGroup을 제거하려고 시도하면 GroupNotEmptyException이 발생한다.
Topic Partition Offset 조회
public void findAllOffset() throws ExecutionException, InterruptedException {
Map<TopicPartition, OffsetSpec> target = new HashMap<>();
target.put(new TopicPartition("test-event", 0), OffsetSpec.latest());
ListOffsetsResult result = adminClient.listOffsets(target);
for (TopicPartition topicPartition : target.keySet()) {
log.info("topic = {}, partition = {}, offsets = {}", topicPartition.topic(), topicPartition.partition(), result.partitionResult(topicPartition).get());
}
}
- .listOffsets()를 사용하여 Topic Partition의 Offset 정보를 조회할 수 있다.
'Kafka' 카테고리의 다른 글
[Spring + Kafka] 에러 핸들링 (1) | 2022.12.04 |
---|---|
[Spring + Kafka] 간단 성능 모니터링 (0) | 2022.12.02 |
[Spring + Kafka] Consume Messages (0) | 2022.11.29 |
[Spring + Kafka] Publish Messages (0) | 2022.11.28 |
[Spring + Kafka] Topic 생성 및 조회, 삭제하기 (0) | 2022.11.27 |