지난 글 요약
- Consumer는 메시지를 가져오기 위해서 Partition에 연속적으로 Poll을 하고 가져온 위치를 나타내는 offset 정보를 __consumer_offsets Topic에 저장하여 관리한다.
- 동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성하고 Consumer Group의 Consumer들은 작업량을 어느 정도 균등하게 분할한다. 이때, 데이터는 Partition에 균등하게 존재해야 한다.
- 동일한 Topic에서 Consume 하는 여러 Consumer Group이 존재할 수 있다.
Partition Assignment
Partition을 Consumer에게 Assign(할당)할 때
- 하나의 Partition은 지정된 Consumer Group 내의 하나의 Consumer만 사용한다.
- 동일한 Key를 가진 메시지는 동일한 Consumer가 사용한다. -> Partition 수가 변경되면 데이터의 저장되는 위치가 달라지므로 그 이전까지
- Consumer의 설정 파라미터 중에서 partition.assignment.strategy로 할당 방식을 조정할 수 있다.
- Consumer Group은 Group Coordinator 프로세스에 의해서 관리된다.
* Consumer Group을 등록하고 Consumer 전체에 Partition을 할당하는 프로세스
- Consumer 등록 및 Group Coordinator 선택
- 각 Consumer는 group.id로 Kafka 클러스터에 자신을 등록하면 Kafka는 Consumer Group을 만들고 Consumer의 모든 Offset은 __consumer_offsets Topic에 저장된다.
- hash(group.id) % offsets.topic.num.partitions 수식을 사용하여, group.id가 저장될 __consumer_offsets의 Partition을 결정하고 해당 Partition을 가지고 있는 Broker는 Croup Coordinator가 된다.
- JoinGroup 요청 순서에 따라 Consumer 나열
- JoinGroup 요청이란, Consumer들이 Broker에 붙게 되면 Kafka에 요청하는 요청 값
- Group Coordinator는 Group의 Consumers 카탈로그를 생성하기 전에 Consumers의 JoinGroup 요청에 대해 group.initail.rebalance.delay.ms(deafult 3초)를 대기한다.
- 이후, Group Corrdinator는 JoinGroup 요청을 수신하는 순서대로 Consumer를 나열해서 제일 먼저 JoinGroup을 요청한 Consumer에게 리스트를 전달한다.
- Group Leader 결정 및 Partition 할당
- JoinGroup 요청을 최초로 보낸 Consumer는 Group Leader로 지정되며, Group Corrdinatro로부터 Consumer 목록을 받는다.
- Group Leader는 구성된 partition.assignment.startegy를 사용해서 각 Consumer에게 Partition을 할당한다.
- Partition보다 Consumer가 더 많은 경우 Partition을 할당받지 못한 Consumer가 발생할 수 있다.
- Consumer -> Partition 매핑 정보를 Group Coordinator에게 전송
- Group Leader는 Consumer -> Partition 매핑 정보를 Group Coordinator에게 다시 보내고 Group Coordinator는 받은 매핑 정보를 메모리에 캐시하고 ZooKeeper에 유지한다.
- Group Coordinator는 각 Consumer에게 할당된 Partition 정보를 보냄
- Group Coordinator는 각 Consumer에게 할당된 Partition 정보를 보내고 각 Consumer는 할당된 Partition에서 Consume을 시작한다.
* Kafka는 가능한 한 많은 계산을 클라이언트에서 수행하도록 하여, Broker의 부담을 줄이기 때문에 Consumer(Group Leader)에게 Partition을 할당하도록 한다.
Consumer Rebalancing
Consumer Rebalancing은 Rebalancing Trigger가 발동하는 순간 바로 발생한다. -> 불필요한 Rebalancing을 피할 수 없다.
* Rebalancing Trigger
- Consumer가 Consumer Group에서 탈퇴할 때
- 신규 Consumer가 Consumer Group에 합류할 때
- Consumer가 Topic 구독을 변경할 때
- Consumer Group이 Topic 메타 데이터의 변경 사항을 인지할 때 (Ex: Partition 증가)
* Rebalancing Process
- Group Coordinator는 heartbeats의 플래그를 사용하여 Consumer에게 Rebalance 신호를 보낸다.
- Consumer가 일시 중지하고 Offset을 Commit한다.
- Rebalancing Trigger의 동작이 이루어진다.
- Partition 재할당
- Consumer는 새로 할당받은 Partition에서 다시 Consume을 시작한다.
* Consumer Heartbeats
- Consumer 장애를 인지하기 위해서 사용하는 값
- Consumer는 poll()과 별도로 백그라운드 Thread에서 Heartbeats를 보낸다.
- heartbeat.interval.ms (default: 3초) 마다 보낸다.
- session.timeout.ms (default: 10초) 동안 Heartbeats가 수신되지 않으면 Consumer는 Consumer Group에서 삭제된다.
- poll() 역시 Heartbeats와 상관 없이 주기적으로 호출되어야 한다.
- max.poll.interval.ms (default: 5분)
Consumer Rebalancing이 발생할 경우 Consumer들은 메시지를 Consume 하지 못하기 때문에 불필요한 Rebalancing은 반드시 피해야 한다.
* 과도한 Rebalancing을 피하는 방법 -> 성능 최적화에 필수
- Consumer Group 멤버 고정
- Group의 각 Consumer에게 고유한 group.instance.id를 할당한다.
- Consumer는 LeaveGroupRequest를 사용하지 않아야 한다.
- Rejoin은 알려진 group.instance.id에 대한 Rebalance Trigger를 하지 않는다.
- Consumer 가 잠시 죽었을 때. Group 탈퇴 / Rejoin으로 인한 Rebalance 방지
- session.timeout.ms 튜닝
- heartbeat.interval.ms를 session.timeout.ms의 1/3으로 설정한다.
- group.min.session.timeout.ms (default : 6초), group.max.sesson.timeout.ms (default: 5 분) 사이의 값을 사용한다.
- 장점: Consumer가 Rejoin(재가입) 할 수 있는 더 많은 시간을 줄 수 있다.
- 단점: Consumer 장애를 감지하는 데 더 오랜 시간이 걸린다.
- max.poll.interval.ms 튜닝
- 데이터를 처리하는데 너무 많은 시간이 소요되면 Consumer Group에서 방출되는 현상이 발생하기 때문에 Consumer에게 poll()한 데이터를 처리할 수 있는 충분한 시간을 제공한다. 단, 너무 크면 안 된다.
'Kafka' 카테고리의 다른 글
[Kafka] Replication (2) (1) | 2022.10.13 |
---|---|
[Kafka] Replication (1) (0) | 2022.10.12 |
[Kafka] Consumer (1) (0) | 2022.10.10 |
[Kafka] Producer (2) (0) | 2022.10.09 |
[Kafka] Producer (1) (0) | 2022.10.09 |