본문 바로가기

Reactive-Programming

[Reactive Streams] Operators

이전 글 요약
Publisher(공급자) -> Subscriber가 Publisher에게 등록 -> Subscriber 최대 처리량 세팅(onSubscribe) -> Publisher는 입력 받은 최대 처리량 씩 Subscriber의 onNext 호출 -> Subscriber는 모든 처리가 완료되면 onComplete 호출, 에러 발생 시 onError 호출

 

Operators

  • Publisher -> [Data1] -> Operator -> [Data2] -> Operator -> [Data3] ... -> Subscriber
    • 중간에 Data를 가공한다.
    • Publisher와 Subscriber가 바로 연결되어 있는 것이 아닌 중간에 Operator가 존재할 수 있다.

Map
Publisher -> [Data1] -> mapPub -> [Data2] -> Subscriber

 
| 코드

public class OperatorPubSub {

    public static void main(String[] args) {
        Publisher<Integer> pub = iterPublisher(Stream.iterate(1, i -> i + 1).limit(10).collect(Collectors.toList()));
        Publisher<Integer> mapPub = mapPub(pub, i -> i * 10);
        Publisher<Integer> mapPub2 = mapPub(mapPub, i -> -i);

        mapPub2.subscribe(printSubscribe());
    }

    private static Publisher<Integer> iterPublisher(Iterable<Integer> iter) {
        return new Publisher<>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(num -> sub.onNext(num));
                            sub.onComplete();
                        } catch (Throwable t) {
                            sub.onError(t);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };
    }

    private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> function) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        sub.onSubscribe(s);
                    }

                    @Override
                    public void onNext(Integer item) {
                        sub.onNext(function.apply(item));
                    }

                    @Override
                    public void onError(Throwable t) {
                        sub.onError(t);
                    }

                    @Override
                    public void onComplete() {
                        sub.onComplete();
                    }
                });
            }
        };
    }

    private static Subscriber<Integer> printSubscribe() {
        return new Subscriber<>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("onSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("[onNext] item = " + item);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("[onError] throwable = " + throwable);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    }

}

=> mapPub 안에서 새로운 Subscriber를 생성하여 item의 값을 변경한 후 다음으로 위임한다.


Reduce
Publisher -> [Data1, ...] -> reducePub -> [Result] -> Subscriber

 
| 코드

public class OperatorPubSub {

    public static void main(String[] args) {
        Publisher<Integer> pub = iterPublisher(Stream.iterate(1, i -> i + 1).limit(10).collect(Collectors.toList()));

		Publisher<Integer> reducePub = reducePub(pub, 0, (a, b) -> a + b);
        reducePub.subscribe(printSubscribe());

    }

    private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
        return sub -> {
            pub.subscribe(new DelegateSub(sub) {
                int result = init;

                @Override
                public void onNext(Integer item) {
                    result = bf.apply(this.result, item);
                }

                @Override
                public void onComplete() {
                    sub.onNext(result);
                    sub.onComplete();
                }
            });
        };
    }
    
    private static Publisher<Integer> iterPublisher(Iterable<Integer> iter) {
        return new Publisher<>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(num -> sub.onNext(num));
                            sub.onComplete();
                        } catch (Throwable t) {
                            sub.onError(t);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };
    }

    private static Subscriber<Integer> printSubscribe() {
        return new Subscriber<>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("onSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("[onNext] item = " + item);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("[onError] throwable = " + throwable);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    }
    
}

Jeneric 적용
public class OperatorPubSub {

    public static void main(String[] args) {
        Publisher<Integer> pub = iterPublisher(Stream.iterate(1, i -> i + 1).limit(10).collect(Collectors.toList()));

        // 1. Map -> 동일 개수
        Publisher<String> mapPub = mapPub(pub, i -> "[" + i + "]");
        mapPub.subscribe(printSubscribe());

        // 2. reduce -> 다른 개수
        Publisher<String> reducePub = reducePub(pub, "", (a, b) -> a + b);
        reducePub.subscribe(printSubscribe());

    }

    private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
        return sub -> {
            pub.subscribe(new DelegateSub<T, R>(sub) {
                R result = init;

                @Override
                public void onNext(T item) {
                    result = bf.apply(this.result, item);
                }

                @Override
                public void onComplete() {
                    sub.onNext(result);
                    sub.onComplete();
                }
            });
        };
    }

    private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> function) {
        return new Publisher<R>() {
            @Override
            public void subscribe(Subscriber<? super R> sub) {
                pub.subscribe(new DelegateSub<T, R>(sub) {
                    @Override
                    public void onNext(T item) {
                        sub.onNext(function.apply(item));
                    }
                });
            }
        };
    }

    private static Publisher<Integer> iterPublisher(Iterable<Integer> iter) {
        return new Publisher<>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(num -> sub.onNext(num));
                            sub.onComplete();
                        } catch (Throwable t) {
                            sub.onError(t);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };
    }

    private static <T> Subscriber<T> printSubscribe() {
        return new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("onSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(T item) {
                System.out.println("[onNext] item = " + item);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("[onError] throwable = " + throwable);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    }

}

Reactor

Reactor란 Reactive Streams의 구현체이자 Spring Framework 5부터 리액티브 프로그래밍을 위해 지원되는 라이브러리이다.

* 구현체 종류

- Flux

  • Publisher 구현체
  • o..N 개의 단일 요소 스트림을 통지하는 발행자이다.
  • 간단한 데이터 출력
Flux.create(e -> {
    e.next(1);
    e.next(2);
    e.next(3);
    e.complete();
})
    .log()
    .subscribe(System.out::println);

<실행 결과>

  • log() -> Operator : 과정을 log로 출력
  • subscribe(Consumer consumer) : Consumer = onNext()에 해당
Flux.just(1, 2, 3)
    .log()
    .map(item -> item * 2)
    .log()
    .subscribe(System.out::println);

실행 결과

  • just() : 시퀀스를 생성하는 가장 기본적인 방법
  • map() -> Operator
Flux.just(1, 2, 3)
    .log()
    .map(item -> item * 2)
    .reduce(0, (a, b) -> a + b)
    .log()
    .subscribe(System.out::println);

실행 결과

  • reduce() : 계산 처리 후 Mono 반환

Publisher 반환 시

| 코드

@RestController
public class HelloController {

    @GetMapping("/hello")
    public Publisher<String> hello(String name) {
        return Flux.just("Hello " + name);
    }
}
  • Spring에서 subscribe() 자동 호출

REF.

https://www.youtube.com/watch?v=DChIxy9g19o&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=2

 

 

'Reactive-Programming' 카테고리의 다른 글

[Spring] Reactive Web (2)  (0) 2022.09.27
[Spring] Reactive Web (1)  (0) 2022.09.25
[Java] 비동기 기술  (0) 2022.09.24
[Reactive Streams] Schedulers  (0) 2022.09.21
[Reactive Streams] Basic  (0) 2022.09.06