Reactive Programming
- 외부에서 데이터나 이벤트가 발생하면 그것에 맞게 대응하도록 프로그래밍하는 기법
1. Duality(쌍대성)
- Iterable
- Collection -> Iterable
public class IterableCustom {
public static void main(String[] args) {
Iterable<Integer> iter = () -> new Iterator<>() {
final int MAX = 10;
int cursor = 0;
@Override public boolean hasNext() {
return cursor < MAX;
}
@Override public Integer next() {
return ++cursor; }
};
for (Integer integer : iter) {
System.out.println(integer);
}
}
}
- Observable
- Observable은 Java 9부터 Deprecated 되었다.
- Observer를 Oberservable에게 등록시킨 후 Data/Event 발생 시 해당 값을 Observer에게 전송.
- 단점
- 완료/종료를 알 수 없다. -> 연속되는 데이터의 끝, Complete의 개념이 없다.
- Error 처리의 미흡
@SuppressWarnings("deprecation")
public class ObservableCustom {
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
setChanged();
notifyObservers(i);
}
}
}
public static void main(String[] args) {
Observer ob = (o, arg) -> System.out.println(arg);
IntObservable intObservable = new IntObservable();
intObservable.addObserver(ob);
intObservable.run();
}
}
> Iterable은 Pull, 받는 쪽에서 데이터를 당겨 온다.
> Observerable은 Push, 받는 쪽으로 데이터를 밀어 넣는다.
=> int i = it.next(void) <-> void notifyObservers(i) - 쌍대성
Reactive Stream
- non-blocking backPressure로 비동기적 스트림 처리를 제공하는 표준
- Back Pressure(배압)
- 빠른 속도의 Publisher 와 느린 속도의 Subscriber 문제를 해결하는 원리
- Subscriber 가 처리할 수 있는 만큼의 데이터를 전송한다.
- Back Pressure(배압)
- 관찰자 패턴(Observer Pattern), 반복자 패턴(Iterator Pattern), 함수형 패러다임의 조합으로 정의
- ObserverPattern의 확장된 개념을 포함하고 있다.
- Interface Summary
- Publisher<T>
- Observable 역할
- 연속된 요소들을 제공한다.
- Subscriber 에게 요소들을 전달한다.
- .addObserver() 대신 .subscribe() 사용한다.
- Publisher 가 Subscriber 에게 전달 해야 하는 요소로 onSubscribe, onNext*, (onError | onComplete)? 가 존재한다.
- Subscriber<T>
- Observer 역할
- Observer 역할
- Subscription
- Processor<T, R>
- Publisher<T>
- Subscriber -> Publisher : 구독
- Publisher -> Subscriber : Subscription 전달
- Subscriber : request 로 처리할 수 있는 데이터 설정 (Back Pressure)
- Publisher : Subscription 을 통해 데이터 전달
- 완료 시 onComplete, 오류 시 onError 종료
위 개념을 다음과 같이 코드로 간략하게 표현할 수 있다.
/**
* Created by YC on 2022/09/06.
*/
public class PubSub {
public static void main(String[] args) {
Iterable<Integer> iter = List.of(1, 2, 3, 4, 5);
ExecutorService es = Executors.newSingleThreadExecutor();
Publisher p = new Publisher() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = iter.iterator();
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
es.execute(() -> {
int i = 0;
try {
while (i++ < n) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
} catch (Exception e) {
subscriber.onError(e);
}
});
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> s = new Subscriber<>() {
Subscription subscription;
final int MAX = 1;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(Thread.currentThread().getName() + " onSubscribe");
this.subscription = subscription;
subscription.request(MAX);
}
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " onNext -> " + item);
this.subscription.request(MAX);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError" + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
p.subscribe(s);
try {
es.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
es.shutdown();
}
}
}
REF.
https://www.youtube.com/watch?v=8fenTR3KOJo&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=1
'Reactive-Programming' 카테고리의 다른 글
[Spring] Reactive Web (2) (0) | 2022.09.27 |
---|---|
[Spring] Reactive Web (1) (0) | 2022.09.25 |
[Java] 비동기 기술 (0) | 2022.09.24 |
[Reactive Streams] Schedulers (0) | 2022.09.21 |
[Reactive Streams] Operators (0) | 2022.09.14 |