이전 글 요약
Publisher(공급자) -> Subscriber가 Publisher에게 등록 -> Subscriber 최대 처리량 세팅(onSubscribe) -> Publisher는 입력 받은 최대 처리량 씩 Subscriber의 onNext 호출 -> Subscriber는 모든 처리가 완료되면 onComplete 호출, 에러 발생 시 onError 호출
Operators
- Publisher -> [Data1] -> Operator -> [Data2] -> Operator -> [Data3] ... -> Subscriber
- 중간에 Data를 가공한다.
- Publisher와 Subscriber가 바로 연결되어 있는 것이 아닌 중간에 Operator가 존재할 수 있다.
Map
Publisher -> [Data1] -> mapPub -> [Data2] -> Subscriber
| 코드
public class OperatorPubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPublisher(Stream.iterate(1, i -> i + 1).limit(10).collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, i -> i * 10);
Publisher<Integer> mapPub2 = mapPub(mapPub, i -> -i);
mapPub2.subscribe(printSubscribe());
}
private static Publisher<Integer> iterPublisher(Iterable<Integer> iter) {
return new Publisher<>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(num -> sub.onNext(num));
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> function) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer item) {
sub.onNext(function.apply(item));
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
});
}
};
}
private static Subscriber<Integer> printSubscribe() {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer item) {
System.out.println("[onNext] item = " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("[onError] throwable = " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
}
}
=> mapPub 안에서 새로운 Subscriber를 생성하여 item의 값을 변경한 후 다음으로 위임한다.
Reduce
Publisher -> [Data1, ...] -> reducePub -> [Result] -> Subscriber
| 코드
public class OperatorPubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPublisher(Stream.iterate(1, i -> i + 1).limit(10).collect(Collectors.toList()));
Publisher<Integer> reducePub = reducePub(pub, 0, (a, b) -> a + b);
reducePub.subscribe(printSubscribe());
}
private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
return sub -> {
pub.subscribe(new DelegateSub(sub) {
int result = init;
@Override
public void onNext(Integer item) {
result = bf.apply(this.result, item);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
};
}
private static Publisher<Integer> iterPublisher(Iterable<Integer> iter) {
return new Publisher<>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(num -> sub.onNext(num));
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
private static Subscriber<Integer> printSubscribe() {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer item) {
System.out.println("[onNext] item = " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("[onError] throwable = " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
}
}
Jeneric 적용
public class OperatorPubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPublisher(Stream.iterate(1, i -> i + 1).limit(10).collect(Collectors.toList()));
// 1. Map -> 동일 개수
Publisher<String> mapPub = mapPub(pub, i -> "[" + i + "]");
mapPub.subscribe(printSubscribe());
// 2. reduce -> 다른 개수
Publisher<String> reducePub = reducePub(pub, "", (a, b) -> a + b);
reducePub.subscribe(printSubscribe());
}
private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
return sub -> {
pub.subscribe(new DelegateSub<T, R>(sub) {
R result = init;
@Override
public void onNext(T item) {
result = bf.apply(this.result, item);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
};
}
private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> function) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
pub.subscribe(new DelegateSub<T, R>(sub) {
@Override
public void onNext(T item) {
sub.onNext(function.apply(item));
}
});
}
};
}
private static Publisher<Integer> iterPublisher(Iterable<Integer> iter) {
return new Publisher<>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(num -> sub.onNext(num));
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
private static <T> Subscriber<T> printSubscribe() {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(T item) {
System.out.println("[onNext] item = " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("[onError] throwable = " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
}
}
Reactor
Reactor란 Reactive Streams의 구현체이자 Spring Framework 5부터 리액티브 프로그래밍을 위해 지원되는 라이브러리이다.
* 구현체 종류
- Flux
- Publisher 구현체
- o..N 개의 단일 요소 스트림을 통지하는 발행자이다.
- 간단한 데이터 출력
Flux.create(e -> {
e.next(1);
e.next(2);
e.next(3);
e.complete();
})
.log()
.subscribe(System.out::println);
- log() -> Operator : 과정을 log로 출력
- subscribe(Consumer consumer) : Consumer = onNext()에 해당
Flux.just(1, 2, 3)
.log()
.map(item -> item * 2)
.log()
.subscribe(System.out::println);
- just() : 시퀀스를 생성하는 가장 기본적인 방법
- map() -> Operator
Flux.just(1, 2, 3)
.log()
.map(item -> item * 2)
.reduce(0, (a, b) -> a + b)
.log()
.subscribe(System.out::println);
- reduce() : 계산 처리 후 Mono 반환
Publisher 반환 시
| 코드
@RestController
public class HelloController {
@GetMapping("/hello")
public Publisher<String> hello(String name) {
return Flux.just("Hello " + name);
}
}
- Spring에서 subscribe() 자동 호출
REF.
https://www.youtube.com/watch?v=DChIxy9g19o&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=2
'Reactive-Programming' 카테고리의 다른 글
[Spring] Reactive Web (2) (0) | 2022.09.27 |
---|---|
[Spring] Reactive Web (1) (0) | 2022.09.25 |
[Java] 비동기 기술 (0) | 2022.09.24 |
[Reactive Streams] Schedulers (0) | 2022.09.21 |
[Reactive Streams] Basic (0) | 2022.09.06 |