Schedulers
Reactive Stream에서 Scheduler는 Operator를 사용하여 별도의 스레드에서 작업을 처리하게 하는 기능을 의미한다.
즉, Scheduler 방식을 사용해서 작업 스레드와 메인 스레드를 분리한다.
| 종류
- SubscribeOn
- 데이터를 주는 쪽(Publisher)을 별도의 쓰레드에서 실행
- Publisher가 느리게 동작할 때 사용 -> Blocking I/O...
- flux.subscribeOn(Schedulers.single()).subscribe()
Executors.newSingleThreadExecutor().execute(() -> pub.subscribe(s));
- PublishOn
- 받아서 처리하는 쪽(Subscriber)을 별도의 쓰레드에서 실행
- Subscriber가 느리게 동작할 때 사용 -> DB 저장...
pubOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println(Thread.currentThread().getName() + " [onSubscribe]");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
System.out.println(Thread.currentThread().getName() + " [onNext] data = " + integer);
}
@Override
public void onError(Throwable t) {
System.out.println(Thread.currentThread().getName() + " [onError] ");
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " [onComplete]");
}
});
-> 차이점 : SubscribeOn은 주는 쪽이 느리게 동작할 때 사용하고 PublishOn은 받는 쪽이 느리게 동작할 때 사용한다.
-> 두 가지 전략을 같이 사용하는 것도 괜찮다.
* 추가
- interver()
- 시간을 주기로 무한 생성
- 데몬 스레드로, 유저 스레드가 존재해야 실행 지속 가능
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
int num = 0;
@Override
public void request(long n) {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleAtFixedRate(() -> {
sub.onNext(num++);
}, 0, 300, TimeUnit.MILLISECONDS);
}
@Override
public void cancel() {
}
});
};
- take()
- limit 역할
- 제한된 개수만큼 생성하고 스레드 종료
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
int num = 0;
boolean cancelled = false;
@Override
public void request(long n) {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleAtFixedRate(() -> {
if (cancelled) {
exec.shutdown();
return;
}
sub.onNext(num++);
}, 0, 300, TimeUnit.MILLISECONDS);
}
@Override
public void cancel() {
cancelled = true;
}
});
};
Publisher<Integer> takePub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
int count = 0;
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
sub.onSubscribe(subscription);
}
@Override
public void onNext(Integer integer) {
sub.onNext(integer);
if (count++ < 10) {
subscription.cancel();
}
}
@Override
public void onError(Throwable throwable) {
sub.onError(throwable);
}
@Override
public void onComplete() {
sub.onComplete();
}
});
};
* REF
https://www.youtube.com/watch?v=Wlqu1xvZCak&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=3
'Reactive-Programming' 카테고리의 다른 글
[Spring] Reactive Web (2) (0) | 2022.09.27 |
---|---|
[Spring] Reactive Web (1) (0) | 2022.09.25 |
[Java] 비동기 기술 (0) | 2022.09.24 |
[Reactive Streams] Operators (0) | 2022.09.14 |
[Reactive Streams] Basic (0) | 2022.09.06 |