본문 바로가기

Kafka

[Kafka] Apache Kafka Connect System 간 메시지 전송이 필요할 때마다 중개 역할을 하는 Application을 개발할 것인가? 개발 인력 개발에 소요되는 시간 소스 코드 유지 보수 새로운 System간 연결에서 중개 역할을 하는 Application을 개발하고 사용하는 것은 매우 많은 비용이 소모된다. Apache Kafka Connect Kafka Connect는 Apache Kafka 안팎으로 데이터를 스트리밍하기 위한 Framework Kafka Connect는 다른 데이터 시스템을 Kafka와 통합하는 과정을 표준화한 Framework이다. 통합을 위한 Connector 개발, 배포, 관리를 단순화 한다. Kafka Connect를 사용함으로써 중개 역할을 하는 Application이 필요 없다. System간 메시지 전.. 더보기
[Spring + Kafka] Spring-Kafka-Streams 간단한 실습 kafka Streams 어떤 Topic으로 들어오는 데이터를 Consume하여, streams api를 통해 처리된 후, 다른 Topic으로 전송(Producing) 하거나 끝내는 행위를 하게 된다. Consumer Stream Consumer/Producer Seperate One Processing Single Complex Batch Processing O X Threading/Pararellism X O Stateful Operations Only Stateless O build.gradle에 다음과 같은 dependency를 추가한다. implementation 'org.apache.kafka:kafka-streams' 데이터 변경 후 다른 Topic 전달 @Configuration @Enab.. 더보기
[Spring + Kafka] 에러 핸들링 기본 정책 Spring-Kafka를 통해서 Consumer를 구현하고, Consumer에서 처리하는 과정에서 오류가 발생하면 기본 설정으로 최초 요청을 포함해 최대 10회까지 재시도를 한다. 모든 재시도 실패하게 되면 해당 메시지는 skip 처리 된다. Spring-Kafka 에러 핸들링 spring-kafka 2.8 이전 버전에서는 KafkaListenerContainerFactory에 RetryTemplate()을 설정하여 재시도 처리를 진행했으나, 2.8 이후 버전에서는 Deprecated 되었다. DefaultErrorHandler를 생성하여 에러를 처리할 수 있다. 실습 환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10 @Bean public Kafka.. 더보기
[Spring + Kafka] 간단 성능 모니터링 환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10 코드를 사용한 Metrics 확인 @Bean public ApplicationRunner runner(KafkaTemplate kafkaTemplate, KafkaListenerEndpointRegistry registry) { return args -> { Map producerMetrics = kafkaTemplate.metrics(); for (MetricName metricName : producerMetrics.keySet()) { log.info("metricName = {}, value = {}", metricName.description(), producerMetrics.get(metricName).. 더보기
[Spring + Kafka] AdminKafka 사용하기 Spring이 제공하는 Adminkafka를 사용하여 Topic, 설정 정보 등의 다양한 Kafka 정보를 조회 및 변경, 삭제할 수 있다. 환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10 Topic 정보 확인하기 @Service public class KafkaManager { private final AdminClient adminClient; private final Logger log = LoggerFactory.getLogger(this.getClass()); public KafkaManager(KafkaAdmin kafkaAdmin) { this.adminClient = AdminClient.create(kafkaAdmin.getConfigurati.. 더보기
[Spring + Kafka] Consume Messages Message Listener Record MessageListener Auto Commit AcknowlegingMessageListener Manual Commit ConsumerAwareMessageListener Consumer 객체 활용 AcknowledgingConsumerAwareMessageListener Manual Commit + Consumer 객체 활용 Batch BatchMessageListener Auto Commit BatchAcknowlegingMessageListener Manual Commit BatchConsumerAwareMessageListener Consumer 객체 활용 BatchAcknowledgingConsumerAwareMessageListener Manua.. 더보기
[Spring + Kafka] Publish Messages 메시지 발행 방법 KafkaTemplate 사용 ProducerFactory 클래스를 사용해서 생성한다. 트랜잭션을 사용하지 않는 경우, Singleton으로 생성한다. flush()를 사용할 경우, 같은 Producer를 사용하는 다른 Thread에서 지연 현상이 발생할 수 있다. * 2.3 이후 부터는 producerPerThread 속성이 추가되어, true로 설정할 경우, 각 Thread에서 별도의 생성자를 만들고 캐시 처리한다. -> flush() 사용에도 지연 현상이 발생하지 않는다. 2.5.10 이후에는 설정을 업데이트하거나, 제거할 수 있는 Method를 제공한다. SSL 키 변경 등에서 유용할게 사용할 수 있다. reset()을 사용하면, 기존 Producer는 닫고 새로운 설정으로 Pr.. 더보기
[Spring + Kafka] Topic 생성 및 조회, 삭제하기 Topic 생성 시 고려해야 할 점 Topic 명 Topic 명은 한 번 정하면, 바꾸기가 매우 어렵기 때문에 Rule을 적용한 패턴을 통해 의미를 부여한다. Topic의 파티션 개수 계산 파티션 개수: 1초 당 메시지 발행 수 / Consumer Thread 1개가 1초당 처리하는 메시지 수 파티션 개수를 늘릴 수는 있지만 줄일 수는 없다. Retention 시간 kafka borker가 메시지를 어느 시간까지 보관할지 시간이나 byte에 따라서 초과 시, 데이터를 삭제한다. 디스크 크기와 데이터의 중요성에 따라 판단한다. 환경: Spring Boot Project 2.7.5 + Spring-Kafka 2.8.10 Application.yml 설정 spring: kafka: bootstrap-serve.. 더보기