본문 바로가기

Kafka

[Spring + Kafka] Spring-Kafka-Streams 간단한 실습

kafka Streams

  • 어떤 Topic으로 들어오는 데이터를 Consume하여, streams api를 통해 처리된 후, 다른 Topic으로 전송(Producing) 하거나 끝내는 행위를 하게 된다.
  Consumer Stream
Consumer/Producer Seperate One
Processing Single Complex
Batch Processing O X
Threading/Pararellism X O
Stateful Operations Only Stateless O

 

build.gradle에 다음과 같은 dependency를 추가한다.

implementation 'org.apache.kafka:kafka-streams'

 

데이터 변경 후 다른  Topic 전달

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @Bean
    public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, String> stream = streamsBuilder.stream("test-streams");
        stream
                .peek(((key, value) -> System.out.println("[Stream] Message = " + value)))  // return 없이 종료
                .map(((key, value) -> KeyValue.pair(key, "change message.")))
                .to("test-streams-to");
        return stream;
    }

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "test-streams-id");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }
}
  • streamsBuilder.stream으로 KStream을 생성할 때, 두 번째 인자로 Consumed를 넘겨줌으로써 Key, Value 직렬화, 역직렬화를 설정할 수 있다.
  • Kafka Streams를 사용하기 위해서는 ApplicationId가 필요한데, ConsumerGroupId를 대체안으로 사용한다.
  • Kafka Stream과 연결할 Topic은 없을 시 자동으로 생성하지 않기 때문에 이전에 만들어야 한다.
  • .peek()를 사용하면 return 없이 다른 Topic으로 데이터를 전달한다.

 

@Service
public class KafkaStreamConsumer {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(id = "test-streams-to-id", topics = "test-streams-to")
    public void listen(String message) {
        log.info("[Consumer] message = {}", message);
    }
}

@SpringBootApplication
public class KafkaStudyApplication {

	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(KafkaStudyApplication.class, args);
	}

	@Bean
	public ApplicationRunner runner(KafkaTemplate<String, String> kafkaTemplate) {
		return args -> {
			while (true) {
				kafkaTemplate.send("test-streams", "test-message!!..");
				Thread.sleep(1000);
			}
		};
	}
}
  • Application 실행 시, test-streams로 메시지를 발송한다.
  • KafkaStreamConsumer에서는 test-streams-to Topic을 구독하여, Stream을 거친 데이터를 log로 출력한다.
  • 다음과 같은 결과를 확인할 수 있다.

 

Value 기준으로 Count 세기

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @Bean
    public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, String> stream = streamsBuilder.stream("test-streams");

        stream
                .groupBy(((key, value) -> value))  
                .count()
                .toStream()
                .peek(((key, value) -> log.info("[Stream] key = {}, value = {}", key, value)));
        
        return stream;
    }

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "test-streams-id");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }
}
  • groupBy()를 사용하면 value를 기준으로 Grouping한다.
  • count()를 사용하여 count를 측정하고 이를 log로 출력한다.
  • 위와 동일한 ApplicationRunner를 사용하면 다음과 같은 로그가 출력된다.

 

'Kafka' 카테고리의 다른 글

[Kafka] Apache Kafka Connect  (0) 2022.12.07
[Spring + Kafka] 에러 핸들링  (1) 2022.12.04
[Spring + Kafka] 간단 성능 모니터링  (0) 2022.12.02
[Spring + Kafka] AdminKafka 사용하기  (0) 2022.12.01
[Spring + Kafka] Consume Messages  (0) 2022.11.29