본문 바로가기

Kafka

[Kafka] Consumer (2)

지난 글 요약

  1. Consumer는 메시지를 가져오기 위해서 Partition에 연속적으로 Poll을 하고 가져온 위치를 나타내는 offset 정보를 __consumer_offsets Topic에 저장하여 관리한다.
  2. 동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성하고 Consumer Group의 Consumer들은 작업량을 어느 정도 균등하게 분할한다. 이때, 데이터는 Partition에 균등하게 존재해야 한다.
  3. 동일한 Topic에서 Consume 하는 여러 Consumer Group이 존재할 수 있다.

Partition Assignment

Partition을 Consumer에게 Assign(할당)할 때

  • 하나의 Partition은 지정된 Consumer Group 내의 하나의 Consumer만 사용한다.
  • 동일한 Key를 가진 메시지는 동일한 Consumer가 사용한다. -> Partition 수가 변경되면 데이터의 저장되는 위치가 달라지므로 그 이전까지
  • Consumer의 설정 파라미터 중에서 partition.assignment.strategy로 할당 방식을 조정할 수 있다.
  • Consumer Group은 Group Coordinator 프로세스에 의해서 관리된다.

* Consumer Group을 등록하고 Consumer 전체에 Partition을 할당하는 프로세스

  1. Consumer 등록 및 Group Coordinator 선택
    • 각 Consumer는 group.id로 Kafka 클러스터에 자신을 등록하면 Kafka는 Consumer Group을 만들고 Consumer의 모든 Offset은 __consumer_offsets Topic에 저장된다.
    • hash(group.id) % offsets.topic.num.partitions 수식을 사용하여, group.id가 저장될 __consumer_offsets의 Partition을 결정하고 해당 Partition을 가지고 있는 Broker는 Croup Coordinator가 된다.
  2. JoinGroup 요청 순서에 따라 Consumer 나열
    • JoinGroup 요청이란, Consumer들이 Broker에 붙게 되면 Kafka에 요청하는 요청 값
    • Group Coordinator는 Group의 Consumers 카탈로그를 생성하기 전에 Consumers의 JoinGroup 요청에 대해 group.initail.rebalance.delay.ms(deafult 3초)를 대기한다.
    • 이후, Group Corrdinator는 JoinGroup 요청을 수신하는 순서대로 Consumer를 나열해서 제일 먼저 JoinGroup을 요청한 Consumer에게 리스트를 전달한다.
  3. Group Leader 결정 및 Partition 할당
    • JoinGroup 요청을 최초로 보낸 Consumer는 Group Leader로 지정되며, Group Corrdinatro로부터 Consumer 목록을 받는다.
    • Group Leader는 구성된 partition.assignment.startegy를 사용해서 각 Consumer에게 Partition을 할당한다.
    • Partition보다 Consumer가 더 많은 경우 Partition을 할당받지 못한 Consumer가 발생할 수 있다.
  4. Consumer -> Partition 매핑 정보를 Group Coordinator에게 전송
    • Group Leader는 Consumer -> Partition 매핑 정보를 Group Coordinator에게 다시 보내고 Group Coordinator는 받은 매핑 정보를 메모리에 캐시하고 ZooKeeper에 유지한다.
  5. Group Coordinator는 각 Consumer에게 할당된 Partition 정보를 보냄
    • Group Coordinator는 각 Consumer에게 할당된 Partition 정보를 보내고 각 Consumer는 할당된 Partition에서 Consume을 시작한다.

* Kafka는 가능한 한 많은 계산을 클라이언트에서 수행하도록 하여, Broker의 부담을 줄이기 때문에 Consumer(Group Leader)에게 Partition을 할당하도록 한다.


Consumer Rebalancing 

Consumer Rebalancing은 Rebalancing Trigger가 발동하는 순간 바로 발생한다. -> 불필요한 Rebalancing을 피할 수 없다.

* Rebalancing Trigger

  • Consumer가 Consumer Group에서 탈퇴할 때
  • 신규 Consumer가 Consumer Group에 합류할 때
  • Consumer가 Topic 구독을 변경할 때
  • Consumer Group이 Topic 메타 데이터의 변경 사항을 인지할 때 (Ex: Partition 증가)

* Rebalancing Process

  1. Group Coordinator는 heartbeats의 플래그를 사용하여 Consumer에게 Rebalance 신호를 보낸다.
  2. Consumer가 일시 중지하고 Offset을 Commit한다.
  3. Rebalancing Trigger의 동작이 이루어진다.
  4. Partition 재할당
  5. Consumer는 새로 할당받은 Partition에서 다시 Consume을 시작한다.

* Consumer Heartbeats

  • Consumer 장애를 인지하기 위해서 사용하는 값
  • Consumer는 poll()과 별도로 백그라운드 Thread에서 Heartbeats를 보낸다.
    • heartbeat.interval.ms (default: 3초) 마다 보낸다.
  • session.timeout.ms (default: 10초) 동안 Heartbeats가 수신되지 않으면 Consumer는 Consumer Group에서 삭제된다.
  • poll() 역시 Heartbeats와 상관 없이 주기적으로 호출되어야 한다.
    • max.poll.interval.ms (default: 5분)

 

Consumer Rebalancing이 발생할 경우 Consumer들은 메시지를 Consume 하지 못하기 때문에 불필요한 Rebalancing은 반드시 피해야 한다.

* 과도한 Rebalancing을 피하는 방법 -> 성능 최적화에 필수

  • Consumer Group 멤버 고정
    • Group의 각 Consumer에게 고유한 group.instance.id를 할당한다.
    • Consumer는 LeaveGroupRequest를 사용하지 않아야 한다.
    • Rejoin은 알려진 group.instance.id에 대한 Rebalance Trigger를 하지 않는다.
    • Consumer 가 잠시 죽었을 때. Group 탈퇴 / Rejoin으로 인한 Rebalance 방지
  • session.timeout.ms 튜닝
    • heartbeat.interval.mssession.timeout.ms의 1/3으로 설정한다.
    • group.min.session.timeout.ms (default : 6초), group.max.sesson.timeout.ms (default: 5 분) 사이의 값을 사용한다.
    • 장점: Consumer가 Rejoin(재가입) 할 수 있는 더 많은 시간을 줄 수 있다.
    • 단점: Consumer 장애를 감지하는 데 더 오랜 시간이 걸린다.
  • max.poll.interval.ms 튜닝
    • 데이터를 처리하는데 너무 많은 시간이 소요되면 Consumer Group에서 방출되는 현상이 발생하기 때문에 Consumer에게 poll()한 데이터를 처리할 수 있는 충분한 시간을 제공한다. 단, 너무 크면 안 된다.

'Kafka' 카테고리의 다른 글

[Kafka] Replication (2)  (1) 2022.10.13
[Kafka] Replication (1)  (0) 2022.10.12
[Kafka] Consumer (1)  (0) 2022.10.10
[Kafka] Producer (2)  (0) 2022.10.09
[Kafka] Producer (1)  (0) 2022.10.09