본문 바로가기

Kafka

[Spring + Kafka] 간단 성능 모니터링 환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10 코드를 사용한 Metrics 확인 @Bean public ApplicationRunner runner(KafkaTemplate kafkaTemplate, KafkaListenerEndpointRegistry registry) { return args -> { Map producerMetrics = kafkaTemplate.metrics(); for (MetricName metricName : producerMetrics.keySet()) { log.info("metricName = {}, value = {}", metricName.description(), producerMetrics.get(metricName).. 더보기
[Spring + Kafka] AdminKafka 사용하기 Spring이 제공하는 Adminkafka를 사용하여 Topic, 설정 정보 등의 다양한 Kafka 정보를 조회 및 변경, 삭제할 수 있다. 환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10 Topic 정보 확인하기 @Service public class KafkaManager { private final AdminClient adminClient; private final Logger log = LoggerFactory.getLogger(this.getClass()); public KafkaManager(KafkaAdmin kafkaAdmin) { this.adminClient = AdminClient.create(kafkaAdmin.getConfigurati.. 더보기
[Spring + Kafka] Publish Messages 메시지 발행 방법 KafkaTemplate 사용 ProducerFactory 클래스를 사용해서 생성한다. 트랜잭션을 사용하지 않는 경우, Singleton으로 생성한다. flush()를 사용할 경우, 같은 Producer를 사용하는 다른 Thread에서 지연 현상이 발생할 수 있다. * 2.3 이후 부터는 producerPerThread 속성이 추가되어, true로 설정할 경우, 각 Thread에서 별도의 생성자를 만들고 캐시 처리한다. -> flush() 사용에도 지연 현상이 발생하지 않는다. 2.5.10 이후에는 설정을 업데이트하거나, 제거할 수 있는 Method를 제공한다. SSL 키 변경 등에서 유용할게 사용할 수 있다. reset()을 사용하면, 기존 Producer는 닫고 새로운 설정으로 Pr.. 더보기
[Spring + Kafka] 간단 실습 * 본 글에서는 Spring + Kafka를 매우 간단하게 적용해 볼 예정이다. kafka 간단 정리 Porudcer는 Topic에 데이터를 적재하고, Consumer는 Topic에서 데이터를 가져와 처리한다. Topic은 키 값에 따라 여러 파티션으로 나눠서 적재(파티셔닝)되고, Consumer Group의 포함된 하나의 Comsumer가 하나의 Partition을 담당하여 데이터를 처리한다. Consumer가 데이터를 처리하면, ACK를 날려 데이터를 처리했다고 표시한다. ACK를 날리는 방법은 여러 가지가 존재한다. Kafka의 자세한 설명은 다음 링크를 통해서 확인할 수 있다.https://yeongchan1228.tistory.com/52 [Kafka] Apache Kafka란? 데이터가 흐르는.. 더보기
[Kafka] Exactly Once Semantics(EOS) * Producer가 데이터를 전달하는 방식 At-Most-Once Semantics(최대 한 번) 확인 시간이 초과되거나 오류가 반환될 때, Producer가 재시도를 하지 않으면, 데이터가 Kafka Topic에 기록되지 않아 Consumer에 전달되지 않을 수 있다. 중복 가능성을 피하기 위해서 때때로 허용한다. At-Least-Once Semantics(최소 한 번) Producer가 Kafka Broker로부터 ack를 수신하고 acks=all이면, 메시지가 Kafka Topic에 최소 한 번 작성되었음을 보장한다. ack가 시간 초과되거나 오류를 수신하면, 메시지가 Kafka Topic에 기록되지 않았다고 가정하고 데이터를 재전송할 수 있다. Broker가 ack를 보내기 직전에 실패했지만, .. 더보기
[Kafka] Kafka Log File * Topic, Partition, Segment 구조(Multi Partition을 통한 병렬 처리 구조일 경우) Partition은 Broker들에게 분산되며, 각 Partition은 Segment File들로 구성된다. Rolling Strategy: log.segment.bytes(default 1GB), log.roll.hours(default 168 hours) Log Segment File Data File이라고 부른다. 첫 번째로 저장되는 데이터의 Offset이 파일명이 된다. Segment File이 생성되는 위치는 각 Broker의 server.properties 파일 안의 log.dirs 파라미터로 정의한다. -> 콤마를 사용하여 여러 디렉터리 지정 가능 log.dirs=/data/k.. 더보기
[Kafka] Partition Assignment Strategy * Partition Assignment Strategy 설정은 Consumer의 설정 파라미터 중에서 partition.assignment.strategy 값을 아래 표와 같이 변경하여 할당 방식을 조정할 수 있다. org.apache.kafka.clients.consumer.RangeAssignor (default) Topic 별로 작동하는 Default Assignor org.apache.kafka.clints.consumer.RoundRobinAssignor Round Robin 방식으로 Consumer에게 Partition 할당 org.apache.kafka.clients.consumer.StickyAssignor 최대한 많이 기존의 Partition 할당을 유지하면서, 최대 균형을 이루는 할당.. 더보기
[Kafka] Replication (2) In-Sync Replicas 리스트 관리 Leader가 관리한다. 메시지가 ISR 리스트의 모든 Replica에서 수신되면 Commit된 것으로 간주한다. Kafka Cluster의 Controller가 모니터링하는 ZooKeeper의 ISR 리스트에 대한 변경 사항은 Leader가 관리한다. Follower가 실패하는 경우, Leader에 의해 ISR 리스트에서 삭제되고 Leader는 새로운 ISR을 사용하여 Commit한다. (1)에서 replica.lag.time.max.ms 이내에 Follower가 fetch하지 않으면, ISR에서 제거한다. Broker 1이 ZooKeeper에게 ISR 변경을 알린다. ZooKeeper는 Partition Metadata에 대한 변경 사항을 Controller.. 더보기