ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring + Kafka] 간단 실습
    Kafka 2022. 11. 26. 19:44

     

    * 본 글에서는 Spring + Kafka를 매우 간단하게 적용해 볼 예정이다.

     

    kafka 간단 정리

    • Porudcer는 Topic에 데이터를 적재하고, Consumer는 Topic에서 데이터를 가져와 처리한다.
    • Topic은 키 값에 따라 여러 파티션으로 나눠서 적재(파티셔닝)되고, Consumer Group의 포함된 하나의 Comsumer가 하나의 Partition을 담당하여 데이터를 처리한다.
    • Consumer가 데이터를 처리하면, ACK를 날려 데이터를 처리했다고 표시한다.
    • ACK를 날리는 방법은 여러 가지가 존재한다.

     

    Kafka의 자세한 설명은 다음 링크를 통해서 확인할 수 있다.https://yeongchan1228.tistory.com/52

     

    [Kafka] Apache Kafka란?

    데이터가 흐르는 Event Streams를 받아 해당 데이터를 필요로 하는 곳으로 전송해주는 시스템이다. 한마디로, 움직이는 데이터를 처리하는 플랫폼(Event Streaming Platform) Event: 서비스에서 일어나는 모

    yeongchan1228.tistory.com

     

     

    kafka 설치 및 설정

    환경: Ubuntu 20.04 LTS

     

    1. 자바 설치

    sudo apt-get update
    sudo apt-get install openjdk-11-jdk

     

    2. Kafka 계정 생성

    sudo adduser kafka
    su -l kafka

     

    3. Zookeeper 설치

    wget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
    tar -xvf apache-zookeeper-3.6.3-bin.tar.gz
    cp zoo_sample.cfg zoo.cfg # default 옵션으로 Zookeeper를 실행 시키기 위해서 zoo.cfg로 복사한다.
    • Zookeeper 실행: bin/zkServer.sh start

     

    4. Kafka 설치

    wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
    tar -xvf kafka_2.13-2.8.0.tgz
    • Kafka 실행: bin/kafka-server-start.sh config/server.properties

     

    5. Topic 설정

    • Topic 생성
      • bin/kafka-topics.sh --create --topic test-event --bootstrap-server localhost:9092
      • localhost:9092에 test-event topic을 생성한다.

    • Topic List 확인
      • bin/kafka-topics.sh --list --zookeeper localhost:2181
      • Topic의 메타 정보는 Zookeeper에서 관리하기 때문에, Zookeeper에서 Topic 목록을 확인한다.

     

    Spring Boot Project

    1. build.gradle에 kafka dependency 추가

    implementation 'org.springframework.kafka:spring-kafka'
    testImplementation 'org.springframework.kafka:spring-kafka-test'

     

    2. application.yml 설정 정보 추가

    spring:
      kafka:
        consumer:
          bootstrap-servers: {host}:{port} #consumer server ip + port, list = ,
          group-id: alarm # consumerGroupId
          auto-offset-reset: latest # When a new consumer is added to a topic that has already been produced, set the reading point from where
          key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          properties:
            spring.json.trusted.packages: "*"
        listener:
          ack-mode: manual # Send ACK manually.
        producer:
          bootstrap-servers: {host}:{port} #producer server ip + port, list = ,
          key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        topic:
          alarm: test-event
    • spring.kafka.bootstrap-servers: 카프카 서버 정보
    • spring.kafka.consumer.group.id: consumerGroupId
    • spring.kafka.consumer.auto-offset-reset: 새로운 consumer가 추가되었을 때 어디서 부터 읽을 것인지
      • ealiest: 맨 처음부터 다시
      • latest: 이전의 데이터는 무시, 현재부터 읽기
    • spring.kafka.consumer.max-poll-records: consumer가 한 번에 가져오는 데이터 개수

     

    3. Producer / Consumer 구현

    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class AlarmProducer {
    
        private final KafkaTemplate<Long, AlarmEvent> kafkaTemplate;
    
        @Value("${spring.kafka.topic.alarm}")
        private String alarmTopicName;
    
        public void send(AlarmEvent alarmEvent) {
            kafkaTemplate.send(alarmTopicName, alarmEvent.getTargetPost().getMember().getId(), alarmEvent);
            log.info("Finished send.");
        }
    }
    
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class AlarmConsumer {
    
        private final ObjectMapper objectMapper;
        private final AlarmService alarmService;
    
        @KafkaListener(topics = "${spring.kafka.topic.alarm}")
        public void consumeAlarm(AlarmEvent alarmEvent, Acknowledgment ack) {
            try {
                log.info("Consume the event {}", objectMapper.writeValueAsString(alarmEvent));
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
    
            alarmService.send(alarmEvent.getAlarmType(), alarmEvent.getFromMember(), alarmEvent.getTargetPost(), alarmEvent.getMsg());
        }
    }
    
    @Data
    @AllArgsConstructor(staticName = "of")
    @NoArgsConstructor
    public class AlarmEvent {
        private AlarmType alarmType;
    
        private Member fromMember;
    
        private Post targetPost;
    
        private String msg;
    }
    • 이번 실습에서는 알림을 전송할 때, 알림을 저장하고, 알림을 보내는 작업을 비동기적으로 처리하기 위해서 Kafka를 사용하였다.
    • Producer가 알림 정보를 Topic으로 전송하고 Consumer가 알림 정보를 Topic으로부터 가져와서 알림을 저장, 전송 작업을 진행한다.

    실행 결과 로그

    s.s.messaging.consumer.AlarmConsumer : Consume the event {"alarmType":"NEW_LIKE_ON_POST","fromMember":{"createdDate":"2022-11-24T21:38:38.51321","lastModifiedDate":"2022-11-24T21:38:38.51321","id":8,"username":"test8@test.com","password":"$2a$10$.e.wL5NxGLX52B6nlcDzmOaYCvdOF9PhDaUzEQ9KivSepJ.S.G7H2","role":"ROLE_USER","posts":[]},"targetPost":{"createdDate":"2022-11-20T02:17:13.783779","lastModifiedDate":"2022-11-20T02:17:13.783779","id":1,"title":"modified Title","content":"modified Content","member":{"createdDate":"2022-11-20T01:45:49.519546","lastModifiedDate":"2022-11-20T01:45:49.519546","id":4,"username":"test4@test.com","password":"$2a$10$TlEYkasPMSFP7q0DqLK13ummr6mBXKvtRYIIdUVyBA.NgZMCz1Hfm","role":"ROLE_USER","posts":[]}},"msg":"New Like!!!"}

    // 알림 저장
    Hibernate: 
        insert 
        into
            alarm
            (created_date, last_modified_date, alarm_type, from_member_id, post_id, to_member_id) 
        values
            (?, ?, ?, ?, ?, ?)

    // 알림 전송
    [SseEmitter] Send test4@test.com
    • 다음과 같이 AlarmEvent를 잘 받아와서 처리하는 것을 확인할 수 있다.

     

     

    * Aws EC2에서 Kafka를 실행하고 Local 환경에서 Spring Project를 실행한다면, Kafka의 기본 설정으로는 disconnection 오류가 발생 할 것이다.

     

    원인

    kafka 폴더/config/server.properties에서 advertised.listeners은 client가 server의 kafka에 접근하여 메시지를 발행하거나 소비하려고 할 때, 접근할 수 있도록 하는 설정이다.

    해당 옵션이 주석 처리되어 있거나, localhost:9092로 설정되어 있으면 client와 server가 같은 환경이라면 상관 없지만 외부 client라면 localhost:9092로 접속해도 메시지를 발행 및 소비할 수 없기 때문이다.

     

    해결

    kafka 폴더/config/server.properties의 advertised.listeners 설정을 Aws EC2 인스턴스의 ip로 설정하면 된다.

    'Kafka' 카테고리의 다른 글

    [Spring + Kafka] Publish Messages  (0) 2022.11.28
    [Spring + Kafka] Topic 생성 및 조회, 삭제하기  (0) 2022.11.27
    [Kafka] Exactly Once Semantics(EOS)  (0) 2022.10.17
    [Kafka] Kafka Log File  (0) 2022.10.15
    [Kafka] Partition Assignment Strategy  (0) 2022.10.14

    댓글

Designed by Tistory.