본문 바로가기

Kafka

[Spring + Kafka] AdminKafka 사용하기

Spring이 제공하는 Adminkafka를 사용하여 Topic, 설정 정보 등의 다양한 Kafka 정보를 조회 및 변경, 삭제할 수 있다.


환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10

Topic 정보 확인하기

@Service
public class KafkaManager {

    private final AdminClient adminClient;
    private final Logger log = LoggerFactory.getLogger(this.getClass());


    public KafkaManager(KafkaAdmin kafkaAdmin) {
        this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    public void describeTopicConfigs() throws ExecutionException, InterruptedException {
        Collection<ConfigResource> resources = List.of(
                new ConfigResource(ConfigResource.Type.TOPIC, "test-event")
        );
        DescribeConfigsResult result = adminClient.describeConfigs(resources);
        log.info("result = {}", result.all().get());
    }
}
  • .describeConfigs()를 사용해서 설정 정보를 확인할 수 있다.
  • ConfigResource의 타입을 Topic으로 설정하고 지정한 Topic의 설정 정보를 확인할 수 있다. 
  • Spring에서 지원하는 ConfigResource.Type의 종류는 다음과 같다.
    • BROKER_LOGGER
    • BROKER
    • TOPIC
    • UNKOWN

 

설정 정보 변경하기

@Service
public class KafkaManager {

    private final AdminClient adminClient;
    private final Logger log = LoggerFactory.getLogger(this.getClass());


    public KafkaManager(KafkaAdmin kafkaAdmin) {
        this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }
    
    public void describeTopicConfigs() throws ExecutionException, InterruptedException {
        Collection<ConfigResource> resources = List.of(
                new ConfigResource(ConfigResource.Type.TOPIC, "test-event")
        );
        DescribeConfigsResult result = adminClient.describeConfigs(resources);
        log.info("result = {}", result.all().get());
    }

    public void changeConfig() throws ExecutionException, InterruptedException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test-event");
        Map<ConfigResource, Collection<AlterConfigOp>> ops = new HashMap<>();
        ops.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "6000"), AlterConfigOp.OpType.SET)));
        adminClient.incrementalAlterConfigs(ops);

        describeTopicConfigs();
    }
}
  • .incrementalAlterConfigs()를 사용하여 설정 정보를 변경할 수 있다.
  • AlterConfigOp.OpType.*을 지정하여 설정 정보를 조작할 수 있다.
  • Spring에서 제공하는 AlterConfigOp.OpType의 종류는 다음과 같다.
    • SET
    • DELETE
    • APPEND
    • SUBTRACT

  • 위 코드를 실행하면, test-event Topic의 retention_ms 정보가 6000으로 변경된 것을 확인할 수 있다. 

 

Record 삭제

public class KafkaManager {

    private final AdminClient adminClient;
    private final Logger log = LoggerFactory.getLogger(this.getClass());


    public KafkaManager(KafkaAdmin kafkaAdmin) {
        this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    public void deleteRecords() throws ExecutionException, InterruptedException {
        TopicPartition topicPartition = new TopicPartition("test-event", 0);
        Map<TopicPartition, RecordsToDelete> target = new HashMap<>();
        target.put(topicPartition, RecordsToDelete.beforeOffset(1));

        DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(target);
        Map<TopicPartition, KafkaFuture<DeletedRecords>> result = deleteRecordsResult.lowWatermarks();
        for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> futureEntry : result.entrySet()) {
            log.info("[deleteRecords] futureEntry key = {}, value = {}", futureEntry.getKey().topic(), futureEntry.getValue().get());
        }
    }
}
  • .deleteRecords()를 사용해서 TopicPartition의 Record를 지울 수 있다.
  • Topic의 파티션을 지정하고, 지정한 Offset 이전 모든 Record를 삭제할 수 있다.

 

ConsumerGroup 조회하기

@Service
public class KafkaManager {

    private final AdminClient adminClient;
    private final Logger log = LoggerFactory.getLogger(this.getClass());


    public KafkaManager(KafkaAdmin kafkaAdmin) {
        this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    public void findAllConsumerGroups() throws ExecutionException, InterruptedException {
        ListConsumerGroupsResult result = adminClient.listConsumerGroups();
        Collection<ConsumerGroupListing> groups = result.valid().get();
        for (ConsumerGroupListing group : groups) {
            log.info("[findAllConsumerGroups] group = {}", group);
        }
    }
}
  • .listConsumerGroups()로 ConsumerGroup을 조회할 수 있다.

 

ConsumerGroup 삭제하기

@Service
public class KafkaManager {

    private final AdminClient adminClient;
    private final Logger log = LoggerFactory.getLogger(this.getClass());


    public KafkaManager(KafkaAdmin kafkaAdmin) {
        this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    public void deleteConsumerGroup() {
        adminClient.deleteConsumerGroups(List.of("test"));
    }
}
  • state=Optional[Empty])인 ConsumerGroup만 제거할 수 있다.
  • state=Optional([Empty])가 아닌 ConsumerGroup을 제거하려고 시도하면 GroupNotEmptyException이 발생한다.

 

Topic Partition Offset 조회

public void findAllOffset() throws ExecutionException, InterruptedException {
    Map<TopicPartition, OffsetSpec> target = new HashMap<>();
    target.put(new TopicPartition("test-event", 0), OffsetSpec.latest());

    ListOffsetsResult result = adminClient.listOffsets(target);
    for (TopicPartition topicPartition : target.keySet()) {
        log.info("topic = {}, partition = {}, offsets = {}", topicPartition.topic(), topicPartition.partition(), result.partitionResult(topicPartition).get());
    }
}
  • .listOffsets()를 사용하여 Topic Partition의 Offset 정보를 조회할 수 있다.