ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] Exactly Once Semantics(EOS)
    Kafka 2022. 10. 17. 20:36

    * Producer가 데이터를 전달하는 방식

    • At-Most-Once Semantics(최대 한 번)
      • 확인 시간이 초과되거나 오류가 반환될 때, Producer가 재시도를 하지 않으면, 데이터가 Kafka Topic에 기록되지 않아 Consumer에 전달되지 않을 수 있다.
      • 중복 가능성을 피하기 위해서 때때로 허용한다.

    • At-Least-Once Semantics(최소 한 번)
      • Producer가 Kafka Broker로부터 ack를 수신하고 acks=all이면, 메시지가 Kafka Topic에 최소 한 번 작성되었음을 보장한다.
      • ack가 시간 초과되거나 오류를 수신하면, 메시지가 Kafka Topic에 기록되지 않았다고 가정하고 데이터를 재전송할 수 있다.
      • Broker가 ack를 보내기 직전에 실패했지만, 데이터가 Kafka Topic에 정상적으로 기록된 후 재전송을 수행하면 메시지가 두 번 기록되어 최종 Consumer에게 중복 데이터가 전송될 수 있다.

    • Exactly-Once Sematics(정확히 한 번)
      • Producer가 데이터 전송을 다시 시도하더라도 데이터가 최종 Consumer에게 정확히 한 번 전달된다.
      • Messaging 시스템 자체와 데이터를 생성하고 소비하는 애플리케이션 간의 협력이 반드시 필요하다.
      • 예를 들어, 데이터를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 데이터를 다시 수신하게 된다.

    Exactly-Once Sematics(EOS)

    • 중복 데이터로 인한 중복 처리를 방지한다. -> EOS의 필요성
    • 데이터가 정확히 한 번 처리되도록 보장해야 하는 실시간 미션 크리티컬 스트리밍 애플리케이션이다.
      • 클라이언트(Idempotent Producer)에서 생성되는 중복 데이터를 방지
      • Transaction 기능을 사용하여, 하나의 Transaction 내의 모든 데이터가 모두 Write 되었는지 또는 Write 되지 않았는지를 확인한다. (Atomic Message)

    • Java Client에서만 지원한다.
      • Producer, Consumer
      • Kafka Connect
      • Kafka Streams API
      • Confluent REST Proxy
      • Confluent ksqlDB

    • EOS 기능을 사용하기 위해서는 Transaction Coordinator를 사용해야 한다.
      • Transaction Coordinator: Transaction Log를 관리하는 Broker Thread -> Commit, Rollback 처리
      • 일련의 ID 번호(Producer Id, Sequence Number, Transaction Id)를 할당하고, 클라이언트가 해당 정보를 데이터 Header에 포함하여 데이터를 고유하게 식별한다.
      • Sequence Number는 Broker가 중복된 데이터를 Skip할 수 있도록 도와준다.

    • Idempotent 설정
      • Producer 파라미터 중, enable.idempotence를 true 설정한다.
      • Producer가 재시도를 하더라도 메시지 중복을 방지한다.
      • 성능에 큰 영향이 없다. -> Header에 Id 값 하나만 추가하기 때문에

    • Transaction 설정
      • 각 Producer에 고유한 transactional.id를 설정한다.
      • Producer를 Transaction API를 사용하여 개발한다.
      • Consumer에서 isolation.level을 read_committed로 설정한다.
      • Broker 파라미터는 Transaction을 위한 Default 값이 적용되어 있다. -> 필요시 수정

    Transaction

    • Transaction Coordinator
      • Consumer Group Coordinator와 비슷하게, 각 Producer에게는 Transaction Coordinator가 할당되며, PID 할당 및 Transaction 관리의 모든 로직을 수행한다.

    • Transaction Log
      • 새로운 Internal Kafka Topic으로써, Consumer Offset Topic과 유사하게 모든 Transaction에 영구적이고 복제된 Record를 저장하는 Transaction Coordinator의 상태 저장소이다.

    • TransactionId
      • Producer를 고유하게 식별하기 위해서 사용되며, 동일한 TransactionId를 가진 Producer의 다른 인스턴스들은 이전 인스턴스에 의해 만들어진 모든 Transaction을 재개 또는 중단할 수 있다.

    • Brokcer 파라미터
    transaction.id.timeout.ms Transaction Coordinator가 Producer TransactionId로부터 Transaction 상태 업데이트를 수신하지 않고 사전에 만료되기 전에 대기하는 최대 시간(ms) default: 604800000 (7 days)
    max.transaction.timeout.ms Transaction에 허용되는 timeout 시간 default: 900000 (15 분)
    transaction.state.log.replication.factor Transaction State Topic의 Replication Facotr default: 3
    transaction.state.log.num.partitions Transaction State Topic의 Partition 개수 default: 50
    transaction.state.log.min.isr Transaction State Topic의 min ISR 개수 default: 2
    transaction.state.log.segment.bytes Transaction State Topic의 Segment 크기 default: 104857600 bytes
    • Producer 파라미터
    enable.idempotence 비활성화의 경우 Transaction을 사용할 수 없다.
    true로 설정하고 acks=all, retries > 1, max.inflight.requests.per.connection=1과 같이 사용해야 한다.
    default: false
    transaction.timeout.ms Transaction Coordinator가 진행 중인 Transaction을 사전에 중단하기 전에 Producer의 Transaction 상태 업데이트를 기다리는 최대 시간(ms)
    broker의 max.transaction.timeout.ms와 같이 사용해야 하고 해당 설정 값보다 크면 오류와 함께 요청이 실패된다.
    default: 600000 (60 초)
    transactional.id Transaction 전달에 사용할 TransactionaIId default: 없음
    • Consumer 파리미터
      • Consumer의 중복처리는 따로 로직을 작성해야 한다.
      • 예를 들어, 데이터를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 데이터를 다시 수신하게 된다.
    isolation.level read_uncommitted: Offset 순서로 Commit된 데이터와 Commit되지 않은 데이터 모두 사용
    read_committed: Non-Transaction 데이터 또는 Commit된 Transaction 데이터만 Offset 순서로 사용한다.
    default: read_uncommitted
    enable.auto.commit false: Consumer Offset에 대한 Auto Commit을 Off default: true

     

    * Transaction 처리 프로세스

    1. Transaction Coordinator 찾기
      • Producer가 initTransactions()를 호출하여 Broker에게 FindCoordinatorRequest를 보내서 Transaction Coordinator의 위치를 찾고 Transaction Coordinator는 pid를 할당한다.

    2. Producer Id 얻기
      • Producer가 Transaction Coordinator에게 InitPidRequest를 보내서(TransactionId 전달) Producer PID를 가져온다.
      • PID의 Epoch를 높여 Producer의 이전 Zombie 인스턴스가 차단되고 Transaction을 진행할 수 없도록 한다.
      • 해당 PID에 대한 매핑이 2a단계에서 Transaction Log에 기록한다.

    3. Transaction 시작
      • Producer가 beginTransaction()을 호출하여 새 Transaction의 시작을 알린다.
      • Producer는 Transaction이 시작되었음을 나타내는 로컬 상태를 기록한다.
      • 첫 번째 Record가 전송될 때까지 Transaction Coordinator의 관점에서는 Transaction이 시작되지 않았다.

    4. AddPartitionsToTxnRequest
      • Producer는 Transaction의 일부로 새 TopicPartition이 처음 기록될 때 이 요청을 Transaction Coordinator에게도 자동으로 보낸다.
      • TopicPartition을 Transaction에 추가하면, 자동으로 Transaction Coordinator가 4a 단계에서 기록한다.
      • Transaction에 추가된 첫 번째 Partition인 경우 Transaction Coordinator는 Transaction Timer도 시작한다.

    5. ProduceRequest
      • Producer는 하나 이상의 ProducerRequests(Producer send()에서 시작됨)를 통해 User Topic Partitions에 메시지를 Write한다.
      • 이러한 요청에는 5a에 표시된 대로 PID, Epoch 및 Sequence Number가 포함된다.

    6. AddOffsetCommitsToTxnRequest
      • Producer에는 Consume되거나 Produce되는 데이터를 Batch 처리할 수 있는 sendOffsetsToTransaction()이 존재한다.
      • sendOffsetsToTransaction 메서드는 groupId가 있는 AddOffsetCommitsToTxnRequests를 Transaction Coordinator에게 보낸다.
      • Transaction Coordinator는 내부 __consumer_offsets Topic에서 이 Consumer Group에 대한 TopicPartition을 추론하고 6a 단계에서 Transaction Log에 이 Topic Partition의 추가를 기록한다.

    7. TxnOffsetCommitRequest
      • Producer는 __consumer_offsets Topic에서 Offset을 유지하기 위해 TxnOffsetCommitRequest를 Consumer Coordinator에게 보낸다.
      • Consumer Coordinator는 전송되는 PID 및 Producer Epoch를 사용하여 Producer가 이 요청을 처리할 수 있는 지(Zombie가 아닌지) 확인한다.
      • Transaction이 Commit 될 때까지 해당 Offset은 외부에서 볼 수 없다.

    8. EndTxnRequest
      • Producer는 Transaction을 완료하기 위해 commitTransaction() 또는 abortTransaction()을 호출한다.
      • Producer는 Commit 되거나 Abort 되는지를 나타내는 데이터와 함께 Transaction Coordinator에게 EndTxnRequest를 보낸다.
      • Transaction Log에 PREPARE_COMMIT 또는 PREPARE_ABORT 메시지를 write한다.

    9. WriteTxnMarkerRequest
      • Transaction Coordinator가 Transaction에 포함된 각 TopicPartition의 Leader에게 이 요청을 보낸다.
      • 해당 요청을 받은 Broker는 COMMIT(PID) 또는 ABORT(PID) 제어 메시지를 로그에 기록한다.
      • __consumer_offsets Topic에도 Commit 또는 Abort가 로그에 기록된다.
      • Consumer Coordinator는 Commit의 경우 이러한 Offset을 구체화하거나 Abort의 경우 무시해야 한다는 알림을 받는다.

    10. Writing thr final Commit or Abort Message
      • Transaction Coordinator는 Transaction이 완료되었음을 나타내는 최종 COMMITTED 또는 ABORTED를 Transaction Log에 기록한다.
      • Transaction Log에 있는 Transaction과 관련된 대부분의 메시지가 제거된다.
      • Timestamp와 함께 완료된 Transaction의 PID만 유지하면 되므로 결국 Producer에 대한 TransactionId -> PID 매핑을 제거할 수 있다.

    'Kafka' 카테고리의 다른 글

    [Spring + Kafka] Topic 생성 및 조회, 삭제하기  (0) 2022.11.27
    [Spring + Kafka] 간단 실습  (0) 2022.11.26
    [Kafka] Kafka Log File  (0) 2022.10.15
    [Kafka] Partition Assignment Strategy  (0) 2022.10.14
    [Kafka] Replication (2)  (1) 2022.10.13

    댓글

Designed by Tistory.