본문 바로가기

Reactive-Programming

[Reactive Streams] Basic

Reactive Programming

  • 외부에서 데이터이벤트가 발생하면 그것에 맞게 대응하도록 프로그래밍하는 기법

1. Duality(쌍대성)

  • Iterable
    • Collection -> Iterable
public class IterableCustom {
    public static void main(String[] args) {
        Iterable<Integer> iter = () -> new Iterator<>() {
            final int MAX = 10;
            int cursor = 0;
            @Override public boolean hasNext() {
                return cursor < MAX;
            }
            @Override public Integer next() {
                return ++cursor; }
        };

        for (Integer integer : iter) {
            System.out.println(integer);
        }
    }
}
  • Observable
    • Observable은 Java 9부터 Deprecated 되었다.
    • Observer를 Oberservable에게 등록시킨 후 Data/Event 발생 시 해당 값을 Observer에게 전송.
    • 단점
      • 완료/종료를 알 수 없다. -> 연속되는 데이터의 끝, Complete의 개념이 없다.
      • Error 처리의 미흡

@SuppressWarnings("deprecation")
public class ObservableCustom {

    static class IntObservable extends Observable implements Runnable {

        @Override
        public void run() {
            for (int i = 1; i <= 10; i++) {
                setChanged();
                notifyObservers(i);
            }
        }
    }

    public static void main(String[] args) {
        Observer ob = (o, arg) -> System.out.println(arg);

        IntObservable intObservable = new IntObservable();
        intObservable.addObserver(ob);

        intObservable.run();
    }
}

> IterablePull, 받는 쪽에서 데이터를 당겨 온다.

> ObserverablePush, 받는 쪽으로 데이터를 밀어 넣는다.

=> int i = it.next(void) <-> void notifyObservers(i) - 쌍대성


Reactive Stream
  • non-blocking backPressure 비동기적 스트림 처리를 제공하는 표준
    • Back Pressure(배압)
      • 빠른 속도의 Publisher 와 느린 속도의 Subscriber 문제를 해결하는 원리
      • Subscriber 가 처리할 수 있는 만큼의 데이터를 전송한다.

  • 관찰자 패턴(Observer Pattern)반복자 패턴(Iterator Pattern), 함수형 패러다임의 조합으로 정의
  • ObserverPattern확장된 개념을 포함하고 있다.
  • Interface Summary
    1. Publisher<T>
      • Observable 역할
      • 연속된 요소들을 제공한다.
      • Subscriber 에게 요소들을 전달한다.
      • .addObserver() 대신 .subscribe() 사용한다.
      • Publisher 가 Subscriber 에게 전달 해야 하는 요소로 onSubscribe, onNext*, (onError | onComplete)? 가 존재한다.

    2. Subscriber<T>
      • Observer 역할

    3. Subscription
    4. Processor<T, R>

  • Subscriber -> Publisher : 구독
  • Publisher -> Subscriber : Subscription 전달
  • Subscriber : request 로 처리할 수 있는 데이터 설정 (Back Pressure)
  • Publisher : Subscription 을 통해 데이터 전달
  • 완료 시 onComplete, 오류 시 onError 종료

위 개념을 다음과 같이 코드로 간략하게 표현할 수 있다.

/**
 * Created by YC on 2022/09/06.
 */
public class PubSub {
    public static void main(String[] args) {
        Iterable<Integer> iter = List.of(1, 2, 3, 4, 5);
        ExecutorService es = Executors.newSingleThreadExecutor();

        Publisher p = new Publisher() {
            @Override
            public void subscribe(Subscriber subscriber) {
                Iterator<Integer> it = iter.iterator();

                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        es.execute(() -> {
                            int i = 0;
                            try {
                                while (i++ < n) {
                                    if (it.hasNext()) {
                                        subscriber.onNext(it.next());
                                    } else {
                                        subscriber.onComplete();
                                        break;
                                    }
                                }
                            } catch (Exception e) {
                                subscriber.onError(e);
                            }
                        });
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };

        Subscriber<Integer> s = new Subscriber<>() {
            Subscription subscription;
            final int MAX = 1;

            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println(Thread.currentThread().getName() + " onSubscribe");

                this.subscription = subscription;
                subscription.request(MAX);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println(Thread.currentThread().getName() + " onNext -> " + item);

                this.subscription.request(MAX);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError" + throwable.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        p.subscribe(s);

        try {
            es.awaitTermination(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            es.shutdown();
        }
    }
}

REF.

https://www.youtube.com/watch?v=8fenTR3KOJo&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=1

 

'Reactive-Programming' 카테고리의 다른 글

[Spring] Reactive Web (2)  (0) 2022.09.27
[Spring] Reactive Web (1)  (0) 2022.09.25
[Java] 비동기 기술  (0) 2022.09.24
[Reactive Streams] Schedulers  (0) 2022.09.21
[Reactive Streams] Operators  (0) 2022.09.14