본문 바로가기

Kafka

[Spring + Kafka] 간단 실습

 

* 본 글에서는 Spring + Kafka를 매우 간단하게 적용해 볼 예정이다.

 

kafka 간단 정리

  • Porudcer는 Topic에 데이터를 적재하고, Consumer는 Topic에서 데이터를 가져와 처리한다.
  • Topic은 키 값에 따라 여러 파티션으로 나눠서 적재(파티셔닝)되고, Consumer Group의 포함된 하나의 Comsumer가 하나의 Partition을 담당하여 데이터를 처리한다.
  • Consumer가 데이터를 처리하면, ACK를 날려 데이터를 처리했다고 표시한다.
  • ACK를 날리는 방법은 여러 가지가 존재한다.

 

Kafka의 자세한 설명은 다음 링크를 통해서 확인할 수 있다.https://yeongchan1228.tistory.com/52

 

[Kafka] Apache Kafka란?

데이터가 흐르는 Event Streams를 받아 해당 데이터를 필요로 하는 곳으로 전송해주는 시스템이다. 한마디로, 움직이는 데이터를 처리하는 플랫폼(Event Streaming Platform) Event: 서비스에서 일어나는 모

yeongchan1228.tistory.com

 

 

kafka 설치 및 설정

환경: Ubuntu 20.04 LTS

 

1. 자바 설치

sudo apt-get update
sudo apt-get install openjdk-11-jdk

 

2. Kafka 계정 생성

sudo adduser kafka
su -l kafka

 

3. Zookeeper 설치

wget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
tar -xvf apache-zookeeper-3.6.3-bin.tar.gz
cp zoo_sample.cfg zoo.cfg # default 옵션으로 Zookeeper를 실행 시키기 위해서 zoo.cfg로 복사한다.
  • Zookeeper 실행: bin/zkServer.sh start

 

4. Kafka 설치

wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xvf kafka_2.13-2.8.0.tgz
  • Kafka 실행: bin/kafka-server-start.sh config/server.properties

 

5. Topic 설정

  • Topic 생성
    • bin/kafka-topics.sh --create --topic test-event --bootstrap-server localhost:9092
    • localhost:9092에 test-event topic을 생성한다.

  • Topic List 확인
    • bin/kafka-topics.sh --list --zookeeper localhost:2181
    • Topic의 메타 정보는 Zookeeper에서 관리하기 때문에, Zookeeper에서 Topic 목록을 확인한다.

 

Spring Boot Project

1. build.gradle에 kafka dependency 추가

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

 

2. application.yml 설정 정보 추가

spring:
  kafka:
    consumer:
      bootstrap-servers: {host}:{port} #consumer server ip + port, list = ,
      group-id: alarm # consumerGroupId
      auto-offset-reset: latest # When a new consumer is added to a topic that has already been produced, set the reading point from where
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
    listener:
      ack-mode: manual # Send ACK manually.
    producer:
      bootstrap-servers: {host}:{port} #producer server ip + port, list = ,
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    topic:
      alarm: test-event
  • spring.kafka.bootstrap-servers: 카프카 서버 정보
  • spring.kafka.consumer.group.id: consumerGroupId
  • spring.kafka.consumer.auto-offset-reset: 새로운 consumer가 추가되었을 때 어디서 부터 읽을 것인지
    • ealiest: 맨 처음부터 다시
    • latest: 이전의 데이터는 무시, 현재부터 읽기
  • spring.kafka.consumer.max-poll-records: consumer가 한 번에 가져오는 데이터 개수

 

3. Producer / Consumer 구현

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmProducer {

    private final KafkaTemplate<Long, AlarmEvent> kafkaTemplate;

    @Value("${spring.kafka.topic.alarm}")
    private String alarmTopicName;

    public void send(AlarmEvent alarmEvent) {
        kafkaTemplate.send(alarmTopicName, alarmEvent.getTargetPost().getMember().getId(), alarmEvent);
        log.info("Finished send.");
    }
}

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {

    private final ObjectMapper objectMapper;
    private final AlarmService alarmService;

    @KafkaListener(topics = "${spring.kafka.topic.alarm}")
    public void consumeAlarm(AlarmEvent alarmEvent, Acknowledgment ack) {
        try {
            log.info("Consume the event {}", objectMapper.writeValueAsString(alarmEvent));
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }

        alarmService.send(alarmEvent.getAlarmType(), alarmEvent.getFromMember(), alarmEvent.getTargetPost(), alarmEvent.getMsg());
    }
}

@Data
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class AlarmEvent {
    private AlarmType alarmType;

    private Member fromMember;

    private Post targetPost;

    private String msg;
}
  • 이번 실습에서는 알림을 전송할 때, 알림을 저장하고, 알림을 보내는 작업을 비동기적으로 처리하기 위해서 Kafka를 사용하였다.
  • Producer가 알림 정보를 Topic으로 전송하고 Consumer가 알림 정보를 Topic으로부터 가져와서 알림을 저장, 전송 작업을 진행한다.

실행 결과 로그

s.s.messaging.consumer.AlarmConsumer : Consume the event {"alarmType":"NEW_LIKE_ON_POST","fromMember":{"createdDate":"2022-11-24T21:38:38.51321","lastModifiedDate":"2022-11-24T21:38:38.51321","id":8,"username":"test8@test.com","password":"$2a$10$.e.wL5NxGLX52B6nlcDzmOaYCvdOF9PhDaUzEQ9KivSepJ.S.G7H2","role":"ROLE_USER","posts":[]},"targetPost":{"createdDate":"2022-11-20T02:17:13.783779","lastModifiedDate":"2022-11-20T02:17:13.783779","id":1,"title":"modified Title","content":"modified Content","member":{"createdDate":"2022-11-20T01:45:49.519546","lastModifiedDate":"2022-11-20T01:45:49.519546","id":4,"username":"test4@test.com","password":"$2a$10$TlEYkasPMSFP7q0DqLK13ummr6mBXKvtRYIIdUVyBA.NgZMCz1Hfm","role":"ROLE_USER","posts":[]}},"msg":"New Like!!!"}

// 알림 저장
Hibernate: 
    insert 
    into
        alarm
        (created_date, last_modified_date, alarm_type, from_member_id, post_id, to_member_id) 
    values
        (?, ?, ?, ?, ?, ?)

// 알림 전송
[SseEmitter] Send test4@test.com
  • 다음과 같이 AlarmEvent를 잘 받아와서 처리하는 것을 확인할 수 있다.

 

 

* Aws EC2에서 Kafka를 실행하고 Local 환경에서 Spring Project를 실행한다면, Kafka의 기본 설정으로는 disconnection 오류가 발생 할 것이다.

 

원인

kafka 폴더/config/server.properties에서 advertised.listeners은 client가 server의 kafka에 접근하여 메시지를 발행하거나 소비하려고 할 때, 접근할 수 있도록 하는 설정이다.

해당 옵션이 주석 처리되어 있거나, localhost:9092로 설정되어 있으면 client와 server가 같은 환경이라면 상관 없지만 외부 client라면 localhost:9092로 접속해도 메시지를 발행 및 소비할 수 없기 때문이다.

 

해결

kafka 폴더/config/server.properties의 advertised.listeners 설정을 Aws EC2 인스턴스의 ip로 설정하면 된다.

'Kafka' 카테고리의 다른 글

[Spring + Kafka] Publish Messages  (0) 2022.11.28
[Spring + Kafka] Topic 생성 및 조회, 삭제하기  (0) 2022.11.27
[Kafka] Exactly Once Semantics(EOS)  (0) 2022.10.17
[Kafka] Kafka Log File  (0) 2022.10.15
[Kafka] Partition Assignment Strategy  (0) 2022.10.14