CompletableFuture
- Java 8에서 부터 사용 가능하다.
- 비동기 작업 결과
- Future -> 비동기 작업의 결과를 담고 있는 Object
- ListenableFuture -> Callback 구조로 결과 처리
- CompletableFuture -> 다양한 비동기 작업을 간단하게 수행할 수 있다.
- Future, CompletionStage를 구현하고 있다.
- CompletionStage -> 완료한 결과에 의존적으로 다른 작업을 실행할 수 있는 기능을 제공한다.
- 별도의 스레드 풀을 가질 수 있다.
- 리스트의 모든 값이 완료될 때까지 기다리거나 하나의 값만 완료되길 기다릴지 선택할 수 있다.
- 람다 표현식, 파이프 라인닝을 활용해 Callback Hell 개선
제공 메서드
- CompletableFuture.completedFuture() -> 이미 작업이 완료된 CompletableFuture 객체를 생성할 수 있다.
CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(1);
System.out.println(f1.get());
실행 결과 : 1
- completableExceptionally() -> 예외 발생 시 예외를 담는다.
CompletableFuture<Integer> f3 =new CompletableFuture<>();
f3.completeExceptionally(new RuntimeException()); // 예외를 담고 있는다.
System.out.println(f3.get()); // get을 호출해야 예외가 발생한다.
실행 결과 :
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException
- runAsync() -> Runnable을 받는다 : 결과 값 없는 비동기 작업 실행
CompletableFuture.runAsync(() -> {
log.info("runAsync");
});
log.info("exit");
실행 결과 :
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - exit
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - runAsync
- thenRun() -> 의존적으로 다음 작업 실행 : 동일 스레드 보장
CompletableFuture
.runAsync(() -> log.info("runAsync"))
.thenRun(() -> log.info("thenRunAsync1"))
.thenRun(() -> log.info("thenRunAsync2"));
실행 결과 :
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - runAsync
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenRunAsync1
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenRunAsync2
- supplyAsync() -> Supplier를 받는다 : 결과 값이 존재하는 비동기 작업 실행
- thenApply() -> Function을 받는다 : 앞의 작업의 결과물을 이용해서 작업 처리 가능, 반환 값 존재
- thenAccept() -> Consumer를 받는다 : 결과를 받아서 처리, 반환 값 X
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
})
.thenApply(res -> {
log.info("theApply {}", res);
return res + 1;
})
.thenAccept(res -> log.info("thenAccept {}", res));
실행 결과 :
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - supplyAsync
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - theApply 1
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenAccept 2
- thenCompse() -> CompletableFuture를 반환할 때 사용 : flatMap과 비슷한 역할
CompletableFuture
.supplyAsync(() -> {
return 1;
})
.thenCompose(res -> {
log.info("thenCompose {}", res);
return CompletableFuture.completedFuture( res + 1);
})
.thenAccept(res -> log.info("thenAccept {}", res));
실행 결과 :
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenCompose 1
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenAccept 2
- exceptionally() -> 작업 도중 에러가 발생했을 때 처리
CompletableFuture
.supplyAsync(() -> {
return 1;
})
.thenCompose(res -> {
log.info("thenCompose {}", res);
if (res == 1) throw new RuntimeException();
return CompletableFuture.completedFuture( res + 1);
})
.exceptionally(e -> {
return -10;
})
.thenAccept(res -> log.info("thenAccept {}", res));
실행 결과 :
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenCompose 1
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenCompose 1
- thenApplyAsync() -> Function을 받음, 다른 스레드를 사용하여 비동기 작업 실행
ExecutorService es = Executors.newFixedThreadPool(10);
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
}, es)
.thenCompose(res -> {
log.info("thenCompose {}", res);
return CompletableFuture.completedFuture(res);
})
.thenApplyAsync(res -> {
log.info("thenApplyAsync {}", res);
return res + 10;
}, es)
.thenAcceptAsync(res -> log.info("thenAcceptAsync {}", res), es);
실행 결과 :
[pool-1-thread-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - supplyAsync
[pool-1-thread-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenCompose 1
[pool-1-thread-2] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenApplyAsync 1
[pool-1-thread-3] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenAcceptAsync 11
* CompletableFuture를 사용하면 이전 포스팅에서 완성한 코드보다 다음과 같이 더 간결하게 코드를 작성할 수 있다.
@Slf4j
@RestController
@RequestMapping("/v4")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV4 {
public static final String Service1Uri = "http://localhost:9090/remote-service?req={req}";
public static final String Service2Uri = "http://localhost:9090/remote-service2?req={req}";
private final AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
// Netty를 사용하여 Thread 1개를 사용하도록 설정
private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
);
private final MyServiceV2 myServiceV2;
public SpringAsyncControllerV4(MyServiceV2 myServiceV2) {
this.myServiceV2 = myServiceV2;
}
/**
* 의존성이 존재하는 경우
*/
@GetMapping("/rest4")
public DeferredResult rest4(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
toCompletableFuture(asyncRestTemplate2.getForEntity(Service1Uri, String.class, "service1 " + idx))
.thenCompose(res ->
toCompletableFuture(asyncRestTemplate2.getForEntity(Service2Uri, String.class, res.getBody()))
)
.thenCompose(res ->
toCompletableFuture(myServiceV2.work(res.getBody()))
)
.thenAccept(res -> dr.setResult(res))
.exceptionally(ex -> { dr.setErrorResult(ex.getMessage()); return null; });
return dr;
}
<T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> lf) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
lf.addCallback(
res -> completableFuture.complete(res),
ex -> completableFuture.completeExceptionally(ex)
);
return completableFuture;
}
}
* 동기적 서비스 로직을 사용했을 경우
@Service
public class MyServiceV4 {
public String work(String req) {
return req + "/asyncWork";
}
}
@Slf4j
@RestController
@RequestMapping("/v4")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV4 {
public static final String Service1Uri = "http://localhost:9090/remote-service?req={req}";
public static final String Service2Uri = "http://localhost:9090/remote-service2?req={req}";
private final AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
// Netty를 사용하여 Thread 1개를 사용하도록 설정
private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
);
private final MyServiceV4 myServiceV4;
public SpringAsyncControllerV4(MyServiceV4 myServiceV4) {
this.myServiceV4 = myServiceV4;
}
/**
* 의존성이 존재하는 경우
*/
@GetMapping("/rest4")
public DeferredResult rest4(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
toCompletableFuture(asyncRestTemplate2.getForEntity(Service1Uri, String.class, "service1 " + idx))
.thenCompose(res ->
toCompletableFuture(asyncRestTemplate2.getForEntity(Service2Uri, String.class, res.getBody()))
)
.thenApplyAsync(res ->
myServiceV4.work(res.getBody())
)
.thenAccept(res -> dr.setResult(res))
.exceptionally(ex -> { dr.setErrorResult(ex.getMessage()); return null; });
return dr;
}
<T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> lf) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
lf.addCallback(
res -> completableFuture.complete(res),
ex -> completableFuture.completeExceptionally(ex)
);
return completableFuture;
}
}
=> CompletableFuture를 사용해서 비동기적으로 실행시킬 수 있다.
* REF
https://www.youtube.com/watch?v=PzxV-bmLSFY&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=7
'Reactive-Programming' 카테고리의 다른 글
[Reactor] Mono란? (0) | 2022.10.03 |
---|---|
[Spring] WebFlux란? (0) | 2022.10.02 |
[Spring] Reactive Web (3) (0) | 2022.09.30 |
[Spring] Reactive Web (2) (0) | 2022.09.27 |
[Spring] Reactive Web (1) (0) | 2022.09.25 |