Flux란, Reactive Streams에서 정의한 Publisher의 구현체로 0..N 개의 데이터를 발행할 수 있다.
- 하나의 데이터를 전달할 때마다 onNext 이벤트 발생
- Flux 내의 모든 데이터의 전달 처리가 완료되면 onComplete 이벤트 발생
- Flux 내의 데이터 처리 중 에러가 발생하면 onError 이벤트
코드
@Slf4j
public class FluxEx {
@RestController
public static class FluxExampleController {
@GetMapping("/flux/ex1")
public Flux<Event> fluxEx1() {
return Flux.just(
new Event(1, "event1"),
new Event(2, "event2")
);
}
}
@Data
@AllArgsConstructor
public static class Event {
long id;
String value;
}
}
@Slf4j
public class FluxEx {
@RestController
public static class FluxExampleController {
@GetMapping("/mono/ex1")
public Mono<List<Event>> monoEx1() {
return Mono.just(
List.of(
new Event(1, "event1"),
new Event(2, "event2")
)
);
}
}
@Data
@AllArgsConstructor
public static class Event {
long id;
String value;
}
}
위 코드는 같은 결과를 가지는 코드이다. Flux = Mono<List...>
Flux.take() -> 설정된 개수를 넘어가면 Cancle
@Slf4j
public class FluxEx {
@RestController
public static class FluxExampleController {
@GetMapping(value = "/flux/ex4", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> fluxEx4() {
return Flux
.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value")))
.take(10);
}
}
@Data
@AllArgsConstructor
public static class Event {
long id;
String value;
}
}
=> 10 개의 데이터 응답
Flux.delayElements() -> 각 요소 당 Delay를 준다.
@Slf4j
public class FluxEx {
@RestController
public static class FluxExampleController {
@GetMapping(value = "/flux/ex5", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> fluxEx5() {
return Flux
.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value")))
.delayElements(Duration.ofSeconds(1))
.take(10);
}
}
@Data
@AllArgsConstructor
public static class Event {
long id;
String value;
}
}
=> 1초에 한 개씩 10번 응답이 온다.
Flux.generate() -> 요소를 생성한다.
@Slf4j
public class FluxEx {
@RestController
public static class FluxExampleController {
@GetMapping(value = "/flux/ex6", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> fluxEx6() {
return Flux
.<Event, Long>generate(() -> 1L, (id, sink) -> {
sink.next(new Event(id, "value" + id));
return ++id;
})
.delayElements(Duration.ofSeconds(1))
.take(10);
}
}
@Data
@AllArgsConstructor
public static class Event {
long id;
String value;
}
}
=> id 값이 1부터 시작해서 다음 요소를 생성할 때 1 증가하여 Event가 생성된다.
Flux.zipWith() -> 타입이 다른 Flux를 합쳐준다.
@Slf4j
public class FluxEx {
@RestController
public static class FluxExampleController {
@GetMapping(value = "/flux/ex7", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> fluxEx7() {
Flux<String> flux1 = Flux
.generate(sink -> sink.next("value"));
Flux<Long> flux2 = Flux.interval(Duration.ofSeconds(1));
return Flux.zip(flux1, flux2)
.map(tuple -> new Event(tuple.getT2(), tuple.getT1() + String.valueOf(tuple.getT2())))
.take(10);
}
}
@Data
@AllArgsConstructor
public static class Event {
long id;
String value;
}
}
=> 1초에 Event를 한 개씩 10번 응답한다.
* REF
https://www.youtube.com/watch?v=ScH7NZU_zvk&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=10
'Reactive-Programming' 카테고리의 다른 글
[Reactor] Mono란? (0) | 2022.10.03 |
---|---|
[Spring] WebFlux란? (0) | 2022.10.02 |
[Spring] Reactive Web (4) (0) | 2022.10.01 |
[Spring] Reactive Web (3) (0) | 2022.09.30 |
[Spring] Reactive Web (2) (0) | 2022.09.27 |