kafka Streams
- 어떤 Topic으로 들어오는 데이터를 Consume하여, streams api를 통해 처리된 후, 다른 Topic으로 전송(Producing) 하거나 끝내는 행위를 하게 된다.
Consumer | Stream | |
Consumer/Producer | Seperate | One |
Processing | Single | Complex |
Batch Processing | O | X |
Threading/Pararellism | X | O |
Stateful Operations | Only Stateless | O |
build.gradle에 다음과 같은 dependency를 추가한다.
implementation 'org.apache.kafka:kafka-streams'
데이터 변경 후 다른 Topic 전달
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Bean
public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
KStream<String, String> stream = streamsBuilder.stream("test-streams");
stream
.peek(((key, value) -> System.out.println("[Stream] Message = " + value))) // return 없이 종료
.map(((key, value) -> KeyValue.pair(key, "change message.")))
.to("test-streams-to");
return stream;
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "test-streams-id");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
}
- streamsBuilder.stream으로 KStream을 생성할 때, 두 번째 인자로 Consumed를 넘겨줌으로써 Key, Value 직렬화, 역직렬화를 설정할 수 있다.
- Kafka Streams를 사용하기 위해서는 ApplicationId가 필요한데, ConsumerGroupId를 대체안으로 사용한다.
- Kafka Stream과 연결할 Topic은 없을 시 자동으로 생성하지 않기 때문에 이전에 만들어야 한다.
- .peek()를 사용하면 return 없이 다른 Topic으로 데이터를 전달한다.
@Service
public class KafkaStreamConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@KafkaListener(id = "test-streams-to-id", topics = "test-streams-to")
public void listen(String message) {
log.info("[Consumer] message = {}", message);
}
}
@SpringBootApplication
public class KafkaStudyApplication {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> kafkaTemplate) {
return args -> {
while (true) {
kafkaTemplate.send("test-streams", "test-message!!..");
Thread.sleep(1000);
}
};
}
}
- Application 실행 시, test-streams로 메시지를 발송한다.
- KafkaStreamConsumer에서는 test-streams-to Topic을 구독하여, Stream을 거친 데이터를 log로 출력한다.
- 다음과 같은 결과를 확인할 수 있다.
Value 기준으로 Count 세기
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Bean
public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
KStream<String, String> stream = streamsBuilder.stream("test-streams");
stream
.groupBy(((key, value) -> value))
.count()
.toStream()
.peek(((key, value) -> log.info("[Stream] key = {}, value = {}", key, value)));
return stream;
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "test-streams-id");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
}
- groupBy()를 사용하면 value를 기준으로 Grouping한다.
- count()를 사용하여 count를 측정하고 이를 log로 출력한다.
- 위와 동일한 ApplicationRunner를 사용하면 다음과 같은 로그가 출력된다.
'Kafka' 카테고리의 다른 글
[Kafka] Apache Kafka Connect (0) | 2022.12.07 |
---|---|
[Spring + Kafka] 에러 핸들링 (1) | 2022.12.04 |
[Spring + Kafka] 간단 성능 모니터링 (0) | 2022.12.02 |
[Spring + Kafka] AdminKafka 사용하기 (0) | 2022.12.01 |
[Spring + Kafka] Consume Messages (0) | 2022.11.29 |