본문 바로가기

Reactive-Programming

[Reactor] Flux란?

Flux란, Reactive Streams에서 정의한 Publisher의 구현체로 0..N 개의 데이터를 발행할 수 있다.
  • 하나의 데이터를 전달할 때마다 onNext 이벤트 발생
  • Flux 내의 모든 데이터의 전달 처리가 완료되면 onComplete 이벤트 발생
  • Flux 내의 데이터 처리 중 에러가 발생하면 onError 이벤트 


코드

@Slf4j
public class FluxEx {
    @RestController
    public static class FluxExampleController {
        @GetMapping("/flux/ex1")
        public Flux<Event> fluxEx1() {
            return Flux.just(
                    new Event(1, "event1"),
                    new Event(2, "event2")
                    );
        }
    }

    @Data
    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}
@Slf4j
public class FluxEx {
    @RestController
    public static class FluxExampleController {
        @GetMapping("/mono/ex1")
        public Mono<List<Event>> monoEx1() {
            return Mono.just(
                    List.of(
                        new Event(1, "event1"),
                        new Event(2, "event2")
                    )
            );
        }

    }

    @Data
    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}

 

위 코드는 같은 결과를 가지는 코드이다. Flux = Mono<List...>

 

Flux.take() -> 설정된 개수를 넘어가면 Cancle

@Slf4j
public class FluxEx {
    @RestController
    public static class FluxExampleController {
        @GetMapping(value = "/flux/ex4", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<Event> fluxEx4() {
            return Flux
                    .fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value")))
                    .take(10);
        }

    }

    @Data
    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}

=> 10 개의 데이터 응답

 

Flux.delayElements() -> 각 요소 당 Delay를 준다.

@Slf4j
public class FluxEx {
    @RestController
    public static class FluxExampleController {
        @GetMapping(value = "/flux/ex5", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<Event> fluxEx5() {
            return Flux
                    .fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value")))
                    .delayElements(Duration.ofSeconds(1))
                    .take(10);
        }

    }

    @Data
    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}

=> 1초에 한 개씩 10번 응답이 온다.


Flux.generate() -> 요소를 생성한다.

@Slf4j
public class FluxEx {
    @RestController
    public static class FluxExampleController {
        @GetMapping(value = "/flux/ex6", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<Event> fluxEx6() {
            return Flux
                    .<Event, Long>generate(() -> 1L, (id, sink) -> {
                        sink.next(new Event(id, "value" + id));
                        return ++id;
                    })
                    .delayElements(Duration.ofSeconds(1))
                    .take(10);
        }

    }

    @Data
    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}

=> id 값이 1부터 시작해서 다음 요소를 생성할  때 1 증가하여 Event가 생성된다.

 

Flux.zipWith() -> 타입이 다른 Flux를 합쳐준다.

@Slf4j
public class FluxEx {
    @RestController
    public static class FluxExampleController {
        @GetMapping(value = "/flux/ex7", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<Event> fluxEx7() {
            Flux<String> flux1 = Flux
                    .generate(sink -> sink.next("value"));

            Flux<Long> flux2 = Flux.interval(Duration.ofSeconds(1));

            return Flux.zip(flux1, flux2)
                    .map(tuple -> new Event(tuple.getT2(), tuple.getT1() + String.valueOf(tuple.getT2())))
                    .take(10);
        }

    }

    @Data
    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}

=> 1초에 Event를 한 개씩 10번 응답한다.

 

* REF

https://www.youtube.com/watch?v=ScH7NZU_zvk&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=10

'Reactive-Programming' 카테고리의 다른 글

[Reactor] Mono란?  (0) 2022.10.03
[Spring] WebFlux란?  (0) 2022.10.02
[Spring] Reactive Web (4)  (0) 2022.10.01
[Spring] Reactive Web (3)  (0) 2022.09.30
[Spring] Reactive Web (2)  (0) 2022.09.27