본문 바로가기

Reactive-Programming

[Reactive Streams] Schedulers

Schedulers

Reactive Stream에서 Scheduler는 Operator를 사용하여 별도의 스레드에서 작업을 처리하게 하는 기능을 의미한다.
즉, Scheduler 방식을 사용해서 작업 스레드와 메인 스레드를 분리한다.

 

| 종류

  • SubscribeOn
    • 데이터를 주는 쪽(Publisher)을 별도의 쓰레드에서 실행
    • Publisher가 느리게 동작할 때 사용 -> Blocking I/O...
    • flux.subscribeOn(Schedulers.single()).subscribe()
Executors.newSingleThreadExecutor().execute(() -> pub.subscribe(s));
  • PublishOn
    • 받아서 처리하는 쪽(Subscriber)을 별도의 쓰레드에서 실행
    • Subscriber가 느리게 동작할 때 사용 -> DB 저장...
pubOnPub.subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        System.out.println(Thread.currentThread().getName() + " [onSubscribe]");
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println(Thread.currentThread().getName() + " [onNext] data = " + integer);
    }

    @Override
    public void onError(Throwable t) {
        System.out.println(Thread.currentThread().getName() + " [onError] ");
    }

    @Override
    public void onComplete() {
        System.out.println(Thread.currentThread().getName() + " [onComplete]");
    }
});

-> 차이점 : SubscribeOn은 주는 쪽이 느리게 동작할 때 사용하고 PublishOn은 받는 쪽이 느리게 동작할 때 사용한다.

-> 두 가지 전략을 같이 사용하는 것도 괜찮다.

 

* 추가

  • interver()
    • 시간을 주기로 무한 생성
    • 데몬 스레드로, 유저 스레드가 존재해야 실행 지속 가능
Publisher<Integer> pub = sub -> {
    sub.onSubscribe(new Subscription() {
        int num = 0;

        @Override
        public void request(long n) {
            ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
            exec.scheduleAtFixedRate(() -> {
                sub.onNext(num++);
            }, 0, 300, TimeUnit.MILLISECONDS);
        }

        @Override
        public void cancel() {
        }
    });
};
  • take()
    • limit 역할
    • 제한된 개수만큼 생성하고 스레드 종료
Publisher<Integer> pub = sub -> {
    sub.onSubscribe(new Subscription() {
        int num = 0;
        boolean cancelled = false;

        @Override
        public void request(long n) {
            ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
            exec.scheduleAtFixedRate(() -> {
                if (cancelled) {
                    exec.shutdown();
                    return;
                }

                sub.onNext(num++);
            }, 0, 300, TimeUnit.MILLISECONDS);
        }

        @Override
        public void cancel() {
            cancelled = true;
        }
    });
    };

    Publisher<Integer> takePub = sub -> {
    pub.subscribe(new Subscriber<Integer>() {
        int count = 0;
        Subscription subscription;

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            sub.onSubscribe(subscription);
        }

        @Override
        public void onNext(Integer integer) {
            sub.onNext(integer);
            if (count++ < 10) {
                subscription.cancel();
            }
        }

        @Override
        public void onError(Throwable throwable) {
            sub.onError(throwable);
        }

        @Override
        public void onComplete() {
            sub.onComplete();
        }
    });
};

 

* REF

https://www.youtube.com/watch?v=Wlqu1xvZCak&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=3

'Reactive-Programming' 카테고리의 다른 글

[Spring] Reactive Web (2)  (0) 2022.09.27
[Spring] Reactive Web (1)  (0) 2022.09.25
[Java] 비동기 기술  (0) 2022.09.24
[Reactive Streams] Operators  (0) 2022.09.14
[Reactive Streams] Basic  (0) 2022.09.06