* Producer가 데이터를 전달하는 방식
- At-Most-Once Semantics(최대 한 번)
- 확인 시간이 초과되거나 오류가 반환될 때, Producer가 재시도를 하지 않으면, 데이터가 Kafka Topic에 기록되지 않아 Consumer에 전달되지 않을 수 있다.
- 중복 가능성을 피하기 위해서 때때로 허용한다.
- At-Least-Once Semantics(최소 한 번)
- Producer가 Kafka Broker로부터 ack를 수신하고 acks=all이면, 메시지가 Kafka Topic에 최소 한 번 작성되었음을 보장한다.
- ack가 시간 초과되거나 오류를 수신하면, 메시지가 Kafka Topic에 기록되지 않았다고 가정하고 데이터를 재전송할 수 있다.
- Broker가 ack를 보내기 직전에 실패했지만, 데이터가 Kafka Topic에 정상적으로 기록된 후 재전송을 수행하면 메시지가 두 번 기록되어 최종 Consumer에게 중복 데이터가 전송될 수 있다.
- Exactly-Once Sematics(정확히 한 번)
- Producer가 데이터 전송을 다시 시도하더라도 데이터가 최종 Consumer에게 정확히 한 번 전달된다.
- Messaging 시스템 자체와 데이터를 생성하고 소비하는 애플리케이션 간의 협력이 반드시 필요하다.
- 예를 들어, 데이터를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 데이터를 다시 수신하게 된다.
Exactly-Once Sematics(EOS)
- 중복 데이터로 인한 중복 처리를 방지한다. -> EOS의 필요성
- 데이터가 정확히 한 번 처리되도록 보장해야 하는 실시간 미션 크리티컬 스트리밍 애플리케이션이다.
- 클라이언트(Idempotent Producer)에서 생성되는 중복 데이터를 방지
- Transaction 기능을 사용하여, 하나의 Transaction 내의 모든 데이터가 모두 Write 되었는지 또는 Write 되지 않았는지를 확인한다. (Atomic Message)
- Java Client에서만 지원한다.
- Producer, Consumer
- Kafka Connect
- Kafka Streams API
- Confluent REST Proxy
- Confluent ksqlDB
- EOS 기능을 사용하기 위해서는 Transaction Coordinator를 사용해야 한다.
- Transaction Coordinator: Transaction Log를 관리하는 Broker Thread -> Commit, Rollback 처리
- 일련의 ID 번호(Producer Id, Sequence Number, Transaction Id)를 할당하고, 클라이언트가 해당 정보를 데이터 Header에 포함하여 데이터를 고유하게 식별한다.
- Sequence Number는 Broker가 중복된 데이터를 Skip할 수 있도록 도와준다.
- Idempotent 설정
- Producer 파라미터 중, enable.idempotence를 true 설정한다.
- Producer가 재시도를 하더라도 메시지 중복을 방지한다.
- 성능에 큰 영향이 없다. -> Header에 Id 값 하나만 추가하기 때문에
- Transaction 설정
- 각 Producer에 고유한 transactional.id를 설정한다.
- Producer를 Transaction API를 사용하여 개발한다.
- Consumer에서 isolation.level을 read_committed로 설정한다.
- Broker 파라미터는 Transaction을 위한 Default 값이 적용되어 있다. -> 필요시 수정
Transaction
- Transaction Coordinator
- Consumer Group Coordinator와 비슷하게, 각 Producer에게는 Transaction Coordinator가 할당되며, PID 할당 및 Transaction 관리의 모든 로직을 수행한다.
- Consumer Group Coordinator와 비슷하게, 각 Producer에게는 Transaction Coordinator가 할당되며, PID 할당 및 Transaction 관리의 모든 로직을 수행한다.
- Transaction Log
- 새로운 Internal Kafka Topic으로써, Consumer Offset Topic과 유사하게 모든 Transaction에 영구적이고 복제된 Record를 저장하는 Transaction Coordinator의 상태 저장소이다.
- 새로운 Internal Kafka Topic으로써, Consumer Offset Topic과 유사하게 모든 Transaction에 영구적이고 복제된 Record를 저장하는 Transaction Coordinator의 상태 저장소이다.
- TransactionId
- Producer를 고유하게 식별하기 위해서 사용되며, 동일한 TransactionId를 가진 Producer의 다른 인스턴스들은 이전 인스턴스에 의해 만들어진 모든 Transaction을 재개 또는 중단할 수 있다.
- Producer를 고유하게 식별하기 위해서 사용되며, 동일한 TransactionId를 가진 Producer의 다른 인스턴스들은 이전 인스턴스에 의해 만들어진 모든 Transaction을 재개 또는 중단할 수 있다.
- Brokcer 파라미터
transaction.id.timeout.ms | Transaction Coordinator가 Producer TransactionId로부터 Transaction 상태 업데이트를 수신하지 않고 사전에 만료되기 전에 대기하는 최대 시간(ms) | default: 604800000 (7 days) |
max.transaction.timeout.ms | Transaction에 허용되는 timeout 시간 | default: 900000 (15 분) |
transaction.state.log.replication.factor | Transaction State Topic의 Replication Facotr | default: 3 |
transaction.state.log.num.partitions | Transaction State Topic의 Partition 개수 | default: 50 |
transaction.state.log.min.isr | Transaction State Topic의 min ISR 개수 | default: 2 |
transaction.state.log.segment.bytes | Transaction State Topic의 Segment 크기 | default: 104857600 bytes |
- Producer 파라미터
enable.idempotence | 비활성화의 경우 Transaction을 사용할 수 없다. true로 설정하고 acks=all, retries > 1, max.inflight.requests.per.connection=1과 같이 사용해야 한다. |
default: false |
transaction.timeout.ms | Transaction Coordinator가 진행 중인 Transaction을 사전에 중단하기 전에 Producer의 Transaction 상태 업데이트를 기다리는 최대 시간(ms) broker의 max.transaction.timeout.ms와 같이 사용해야 하고 해당 설정 값보다 크면 오류와 함께 요청이 실패된다. |
default: 600000 (60 초) |
transactional.id | Transaction 전달에 사용할 TransactionaIId | default: 없음 |
- Consumer 파리미터
- Consumer의 중복처리는 따로 로직을 작성해야 한다.
- 예를 들어, 데이터를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 데이터를 다시 수신하게 된다.
isolation.level | read_uncommitted: Offset 순서로 Commit된 데이터와 Commit되지 않은 데이터 모두 사용 read_committed: Non-Transaction 데이터 또는 Commit된 Transaction 데이터만 Offset 순서로 사용한다. |
default: read_uncommitted |
enable.auto.commit | false: Consumer Offset에 대한 Auto Commit을 Off | default: true |
* Transaction 처리 프로세스
- Transaction Coordinator 찾기
- Producer가 initTransactions()를 호출하여 Broker에게 FindCoordinatorRequest를 보내서 Transaction Coordinator의 위치를 찾고 Transaction Coordinator는 pid를 할당한다.
- Producer가 initTransactions()를 호출하여 Broker에게 FindCoordinatorRequest를 보내서 Transaction Coordinator의 위치를 찾고 Transaction Coordinator는 pid를 할당한다.
- Producer Id 얻기
- Producer가 Transaction Coordinator에게 InitPidRequest를 보내서(TransactionId 전달) Producer PID를 가져온다.
- PID의 Epoch를 높여 Producer의 이전 Zombie 인스턴스가 차단되고 Transaction을 진행할 수 없도록 한다.
- 해당 PID에 대한 매핑이 2a단계에서 Transaction Log에 기록한다.
- Transaction 시작
- Producer가 beginTransaction()을 호출하여 새 Transaction의 시작을 알린다.
- Producer는 Transaction이 시작되었음을 나타내는 로컬 상태를 기록한다.
- 첫 번째 Record가 전송될 때까지 Transaction Coordinator의 관점에서는 Transaction이 시작되지 않았다.
- AddPartitionsToTxnRequest
- Producer는 Transaction의 일부로 새 TopicPartition이 처음 기록될 때 이 요청을 Transaction Coordinator에게도 자동으로 보낸다.
- TopicPartition을 Transaction에 추가하면, 자동으로 Transaction Coordinator가 4a 단계에서 기록한다.
- Transaction에 추가된 첫 번째 Partition인 경우 Transaction Coordinator는 Transaction Timer도 시작한다.
- ProduceRequest
- Producer는 하나 이상의 ProducerRequests(Producer send()에서 시작됨)를 통해 User Topic Partitions에 메시지를 Write한다.
- 이러한 요청에는 5a에 표시된 대로 PID, Epoch 및 Sequence Number가 포함된다.
- AddOffsetCommitsToTxnRequest
- Producer에는 Consume되거나 Produce되는 데이터를 Batch 처리할 수 있는 sendOffsetsToTransaction()이 존재한다.
- sendOffsetsToTransaction 메서드는 groupId가 있는 AddOffsetCommitsToTxnRequests를 Transaction Coordinator에게 보낸다.
- Transaction Coordinator는 내부 __consumer_offsets Topic에서 이 Consumer Group에 대한 TopicPartition을 추론하고 6a 단계에서 Transaction Log에 이 Topic Partition의 추가를 기록한다.
- TxnOffsetCommitRequest
- Producer는 __consumer_offsets Topic에서 Offset을 유지하기 위해 TxnOffsetCommitRequest를 Consumer Coordinator에게 보낸다.
- Consumer Coordinator는 전송되는 PID 및 Producer Epoch를 사용하여 Producer가 이 요청을 처리할 수 있는 지(Zombie가 아닌지) 확인한다.
- Transaction이 Commit 될 때까지 해당 Offset은 외부에서 볼 수 없다.
- EndTxnRequest
- Producer는 Transaction을 완료하기 위해 commitTransaction() 또는 abortTransaction()을 호출한다.
- Producer는 Commit 되거나 Abort 되는지를 나타내는 데이터와 함께 Transaction Coordinator에게 EndTxnRequest를 보낸다.
- Transaction Log에 PREPARE_COMMIT 또는 PREPARE_ABORT 메시지를 write한다.
- WriteTxnMarkerRequest
- Transaction Coordinator가 Transaction에 포함된 각 TopicPartition의 Leader에게 이 요청을 보낸다.
- 해당 요청을 받은 Broker는 COMMIT(PID) 또는 ABORT(PID) 제어 메시지를 로그에 기록한다.
- __consumer_offsets Topic에도 Commit 또는 Abort가 로그에 기록된다.
- Consumer Coordinator는 Commit의 경우 이러한 Offset을 구체화하거나 Abort의 경우 무시해야 한다는 알림을 받는다.
- Writing thr final Commit or Abort Message
- Transaction Coordinator는 Transaction이 완료되었음을 나타내는 최종 COMMITTED 또는 ABORTED를 Transaction Log에 기록한다.
- Transaction Log에 있는 Transaction과 관련된 대부분의 메시지가 제거된다.
- Timestamp와 함께 완료된 Transaction의 PID만 유지하면 되므로 결국 Producer에 대한 TransactionId -> PID 매핑을 제거할 수 있다.
'Kafka' 카테고리의 다른 글
[Spring + Kafka] Topic 생성 및 조회, 삭제하기 (0) | 2022.11.27 |
---|---|
[Spring + Kafka] 간단 실습 (0) | 2022.11.26 |
[Kafka] Kafka Log File (0) | 2022.10.15 |
[Kafka] Partition Assignment Strategy (0) | 2022.10.14 |
[Kafka] Replication (2) (1) | 2022.10.13 |