* 본 글에서는 Spring + Kafka를 매우 간단하게 적용해 볼 예정이다.
kafka 간단 정리
- Porudcer는 Topic에 데이터를 적재하고, Consumer는 Topic에서 데이터를 가져와 처리한다.
- Topic은 키 값에 따라 여러 파티션으로 나눠서 적재(파티셔닝)되고, Consumer Group의 포함된 하나의 Comsumer가 하나의 Partition을 담당하여 데이터를 처리한다.
- Consumer가 데이터를 처리하면, ACK를 날려 데이터를 처리했다고 표시한다.
- ACK를 날리는 방법은 여러 가지가 존재한다.
Kafka의 자세한 설명은 다음 링크를 통해서 확인할 수 있다.https://yeongchan1228.tistory.com/52
kafka 설치 및 설정
환경: Ubuntu 20.04 LTS
1. 자바 설치
sudo apt-get update
sudo apt-get install openjdk-11-jdk
2. Kafka 계정 생성
sudo adduser kafka
su -l kafka
3. Zookeeper 설치
wget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
tar -xvf apache-zookeeper-3.6.3-bin.tar.gz
cp zoo_sample.cfg zoo.cfg # default 옵션으로 Zookeeper를 실행 시키기 위해서 zoo.cfg로 복사한다.
- Zookeeper 실행: bin/zkServer.sh start
4. Kafka 설치
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xvf kafka_2.13-2.8.0.tgz
- Kafka 실행: bin/kafka-server-start.sh config/server.properties
5. Topic 설정
- Topic 생성
- bin/kafka-topics.sh --create --topic test-event --bootstrap-server localhost:9092
- localhost:9092에 test-event topic을 생성한다.
- Topic List 확인
- bin/kafka-topics.sh --list --zookeeper localhost:2181
- Topic의 메타 정보는 Zookeeper에서 관리하기 때문에, Zookeeper에서 Topic 목록을 확인한다.
Spring Boot Project
1. build.gradle에 kafka dependency 추가
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
2. application.yml 설정 정보 추가
spring:
kafka:
consumer:
bootstrap-servers: {host}:{port} #consumer server ip + port, list = ,
group-id: alarm # consumerGroupId
auto-offset-reset: latest # When a new consumer is added to a topic that has already been produced, set the reading point from where
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
listener:
ack-mode: manual # Send ACK manually.
producer:
bootstrap-servers: {host}:{port} #producer server ip + port, list = ,
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
topic:
alarm: test-event
- spring.kafka.bootstrap-servers: 카프카 서버 정보
- spring.kafka.consumer.group.id: consumerGroupId
- spring.kafka.consumer.auto-offset-reset: 새로운 consumer가 추가되었을 때 어디서 부터 읽을 것인지
- ealiest: 맨 처음부터 다시
- latest: 이전의 데이터는 무시, 현재부터 읽기
- spring.kafka.consumer.max-poll-records: consumer가 한 번에 가져오는 데이터 개수
3. Producer / Consumer 구현
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmProducer {
private final KafkaTemplate<Long, AlarmEvent> kafkaTemplate;
@Value("${spring.kafka.topic.alarm}")
private String alarmTopicName;
public void send(AlarmEvent alarmEvent) {
kafkaTemplate.send(alarmTopicName, alarmEvent.getTargetPost().getMember().getId(), alarmEvent);
log.info("Finished send.");
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {
private final ObjectMapper objectMapper;
private final AlarmService alarmService;
@KafkaListener(topics = "${spring.kafka.topic.alarm}")
public void consumeAlarm(AlarmEvent alarmEvent, Acknowledgment ack) {
try {
log.info("Consume the event {}", objectMapper.writeValueAsString(alarmEvent));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
alarmService.send(alarmEvent.getAlarmType(), alarmEvent.getFromMember(), alarmEvent.getTargetPost(), alarmEvent.getMsg());
}
}
@Data
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class AlarmEvent {
private AlarmType alarmType;
private Member fromMember;
private Post targetPost;
private String msg;
}
- 이번 실습에서는 알림을 전송할 때, 알림을 저장하고, 알림을 보내는 작업을 비동기적으로 처리하기 위해서 Kafka를 사용하였다.
- Producer가 알림 정보를 Topic으로 전송하고 Consumer가 알림 정보를 Topic으로부터 가져와서 알림을 저장, 전송 작업을 진행한다.
실행 결과 로그
s.s.messaging.consumer.AlarmConsumer : Consume the event {"alarmType":"NEW_LIKE_ON_POST","fromMember":{"createdDate":"2022-11-24T21:38:38.51321","lastModifiedDate":"2022-11-24T21:38:38.51321","id":8,"username":"test8@test.com","password":"$2a$10$.e.wL5NxGLX52B6nlcDzmOaYCvdOF9PhDaUzEQ9KivSepJ.S.G7H2","role":"ROLE_USER","posts":[]},"targetPost":{"createdDate":"2022-11-20T02:17:13.783779","lastModifiedDate":"2022-11-20T02:17:13.783779","id":1,"title":"modified Title","content":"modified Content","member":{"createdDate":"2022-11-20T01:45:49.519546","lastModifiedDate":"2022-11-20T01:45:49.519546","id":4,"username":"test4@test.com","password":"$2a$10$TlEYkasPMSFP7q0DqLK13ummr6mBXKvtRYIIdUVyBA.NgZMCz1Hfm","role":"ROLE_USER","posts":[]}},"msg":"New Like!!!"}
// 알림 저장
Hibernate:
insert
into
alarm
(created_date, last_modified_date, alarm_type, from_member_id, post_id, to_member_id)
values
(?, ?, ?, ?, ?, ?)
// 알림 전송
[SseEmitter] Send test4@test.com
- 다음과 같이 AlarmEvent를 잘 받아와서 처리하는 것을 확인할 수 있다.
* Aws EC2에서 Kafka를 실행하고 Local 환경에서 Spring Project를 실행한다면, Kafka의 기본 설정으로는 disconnection 오류가 발생 할 것이다.
원인
kafka 폴더/config/server.properties에서 advertised.listeners은 client가 server의 kafka에 접근하여 메시지를 발행하거나 소비하려고 할 때, 접근할 수 있도록 하는 설정이다.
해당 옵션이 주석 처리되어 있거나, localhost:9092로 설정되어 있으면 client와 server가 같은 환경이라면 상관 없지만 외부 client라면 localhost:9092로 접속해도 메시지를 발행 및 소비할 수 없기 때문이다.
해결
kafka 폴더/config/server.properties의 advertised.listeners 설정을 Aws EC2 인스턴스의 ip로 설정하면 된다.
'Kafka' 카테고리의 다른 글
[Spring + Kafka] Publish Messages (0) | 2022.11.28 |
---|---|
[Spring + Kafka] Topic 생성 및 조회, 삭제하기 (0) | 2022.11.27 |
[Kafka] Exactly Once Semantics(EOS) (0) | 2022.10.17 |
[Kafka] Kafka Log File (0) | 2022.10.15 |
[Kafka] Partition Assignment Strategy (0) | 2022.10.14 |