CompletableFuture ์‚ฌ์šฉ๋ฐฉ๋ฒ•

๋“ค์–ด๊ฐ€๋ฉฐ

Future ์ธํ„ฐํŽ˜์ด์Šค๋Š” java5๋ถ€ํ„ฐ java.util.concurrency ํŒจํ‚ค์ง€์—์„œ ๋น„๋™๊ธฐ์˜ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฐ›๋Š” ์šฉ๋„๋กœ ์‚ฌ์šฉํ–ˆ๋‹ค. ํ•˜์ง€๋งŒ ๋น„๋™๊ธฐ์˜ ๊ฒฐ๊ณผ๊ฐ’์„ ์กฐํ•ฉํ•˜๊ฑฐ๋‚˜, error๋ฅผ ํ•ธ๋“ค๋งํ•  ์ˆ˜๊ฐ€ ์—†์—ˆ๋‹ค.

์ž๋ฐ”8๋ถ€ํ„ฐ CompletableFuture ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ์†Œ๊ฐœ๋˜์—ˆ๊ณ , Future ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•จ๊ณผ ๋™์‹œ์— CompletionStage ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•œ๋‹ค. CompletionStage๋Š” ๋น„๋™๊ธฐ ์—ฐ์‚ฐ Step์„ ์ œ๊ณตํ•ด์„œ ๊ณ„์† ์ฒด์ด๋‹ ํ˜•ํƒœ๋กœ ์กฐํ•ฉ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

๊ธฐ๋ณธ์ ์ธ ์‚ฌ์šฉ๋ฐฉ๋ฒ•

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "Hello");
cf.get(); // Hello

supplyAsync ์ •์  ๋ฉ”์„œ๋“œ๋Š” Supplier ํƒ€์ž…์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๋„˜๊ธด๋‹ค. ๊ฐ’์„ ์ œ๊ณตํ•ด์ฃผ๊ธฐ๋•Œ๋ฌธ์— CompletableFuture์— ๋‹ด์„ ์ˆ˜ ์žˆ๋‹ค. get()์„ ํ†ตํ•ด์„œ ๋‹ด๊ธด ๊ฐ’์„ ๊บผ๋‚ด์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

CompletableFuture.runAsync(()->log.info("runAsync"));

๋ฐ˜๋ฉด runAsync๋Š” Runnable ํƒ€์ž…์„ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ „๋‹ฌํ•œ๋‹ค. ๋‹น์—ฐํžˆ Runnable์€ ์–ด๋–ค ๊ฒฐ๊ณผ๊ฐ’์„ ๋‹ด์ง€ ์•Š๋Š”๋‹ค. run ๋ฉ”์„œ๋“œ์˜ ๋ฆฌํ„ด ํƒ€์ž…์ด void์ด๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

Runnable์€ run๋ฉ”์„œ๋“œ void๋ฅผ ๊ฐ€์ง€๋Š” Functional Interface์ด๋‹ค.

CompletableFuture.supplyAsync(() -> {
    log.info("supplyAsync");
    return "Hello";
}).thenApplyAsync(s -> {
    log.info("thenApplyAsync : {}", s);
    return s + "world";
}).thenAcceptAsync(s -> {
    log.info("thenAccept: {}", s);
});

๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ฒด์ด๋‹ ํ˜•ํƒœ๋กœ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค.

20:54:25.578 [ForkJoinPool.commonPool-worker-1] INFO com.example.demo.practice.CFutureEx1 - supplyAsync
20:54:25.583 [ForkJoinPool.commonPool-worker-1] INFO com.example.demo.practice.CFutureEx1 - thenApplyAsync : Hello
20:54:25.586 [ForkJoinPool.commonPool-worker-1] INFO com.example.demo.practice.CFutureEx1 - thenAccept: Hello world

๋ชจ๋‘๋‹ค xxxAsync๊ฐ€ ๋ถ™์€ ๋ฉ”์„œ๋“œ๋Š” ๊ธฐ๋ณธ ForkJoinPool. commonPool()์„ ์‚ฌ์šฉํ•˜๊ณ , ๊ธฐ์กด์˜ thread๋ฅผ ํ™œ์šฉํ•˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

๋‹ค๋ฅธ Executor ๋„˜๊ธฐ๊ธฐ

๊ธฐ๋ณธ ์ œ๊ณตํ•ด์ฃผ๋Š” ForkJoinPool์˜ commonPool ๋ง๊ณ , ์šฐ๋ฆฌ๊ฐ€ ์ •์˜ํ•œ Executor๋ฅผ ๋„˜๊ธธ๋ ค๋ฉด ๊ฐ ๋ฉ”์„œ๋“œ์˜ 2๋ฒˆ์งธ ์ธ์ž๋กœ ๋„˜๊ธธ์ˆ˜ ์žˆ๋‹ค.

ExecutorService es = Executors.newFixedThreadPool(10);

CompletableFuture.supplyAsync(() -> {
    log.info("supplyAsync");
    return "Hello";
}, es).thenApplyAsync(s -> {
    log.info("thenApplyAsync : {}", s);
    return s + " world";
}, es).thenAcceptAsync(s -> {
    log.info("thenAccept: {}", s);
}, es);
21:07:24.057 [pool-1-thread-1] INFO com.example.demo.practice.CFutureEx1 - supplyAsync
21:07:24.063 [pool-1-thread-2] INFO com.example.demo.practice.CFutureEx1 - thenApplyAsync : Hello
21:07:24.065 [pool-1-thread-3] INFO com.example.demo.practice.CFutureEx1 - thenAccept: Hello world

์Šค๋ ˆ๋“œ ํ’€์„ ๋งŒ๋“ค์–ด์„œ, ๊ฐ๊ฐ ๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ๊ฐ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์—์„œ ์ฒ˜๋ฆฌํ•œ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

thenApply vs thenCompose

ํ”ํžˆ ๋‘๊ฐœ์˜ ๋ฉ”์„œ๋“œ๋ฅผ ํ—ท๊ฐˆ๋ฆฌ๋Š”๋ฐ, ๊ฒฐ๊ตญ์—๋Š” CompletableFuture๋ฅผ returnํ•˜๊ณ , ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ Function<T,U> ํƒ€์ž…์„ ๋ฐ›๋Š”๋‹ค. ํ”ํžˆ, thenApply๋Š” ์ŠคํŠธ๋ฆผ์˜ map์— ๋น„์œ ํ•˜๊ณ , thenCompose๋Š” flatMap์— ๋น„์œ ํ•œ๋‹ค. ์•„๋ž˜ ์‹ค์ œ CompletableFuture์— ์ •์˜๋˜์–ด์žˆ๋Š” ๋ฉ”์„œ๋“œ 2๊ฐœ๋ฅผ ์‚ดํŽด๋ณด์ž.

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

๋‹ค๋ฅธ์ ์€ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›๋Š” Function์— Tํƒ€์ž…์„ -> Uํƒ€์ž…์œผ๋กœ ๋ณ€ํ™˜ํ• ๋•Œ, thenCompose๊ฐ™์€ ๊ฒฝ์šฐ์—๋Š” CompletionStage ํƒ€์ž…์œผ๋กœ๋งŒ ๋ณ€ํ™˜์„ ํ•ด์•ผํ•œ๋‹ค. ๊ทธ๋ ‡๊ธฐ ๋•Œ๋ฌธ์— ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ž‘์„ฑํ•˜๋ฉด IDE์—์„œ ์ปดํŒŒ์ผ ์—๋Ÿฌ๊ฐ€ ๋‚œ๋‹ค. ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์ž‘์„ฑํ• ๋ ค๋ฉด completedFuture ๋ฉ”์„œ๋“œ๋กœ ์ƒˆ๋กœ์šด CompletableFuture ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค์–ด์•ผ ํ•œ๋‹ค.

//OK
CompletableFuture.supplyAsync(() -> 1)
    .thenApply(i -> i + 1)
    .thenAccept(System.out::println);

// ์ปดํŒŒ์ผ์—๋Ÿฌ
CompletableFuture.supplyAsync(() -> 1)
    .thenCompose(i -> i * 3) // compile ์—๋Ÿฌ
    .thenAccept(System.out::println);

// OK
CompletableFuture.supplyAsync(() -> 1)
    .thenCompose(i -> CompletableFuture.completedFuture(i * 3))
    .thenAccept(System.out::println);

์—๋Ÿฌ ํ•ธ๋“ค๋ง

์ฒ˜์Œ์— ์ด์•ผ๊ธฐํ–ˆ๋˜ Future์—์„œ ์—๋Ÿฌ๋ฅผ ํ•ธ๋“ค๋ง ํ• ์ˆ˜ ์—†์—ˆ๋˜ ๋ฌธ์ œ๋ฅผ CompletableFuture์—์„œ๋Š” ์–ด๋–ป๊ฒŒ ํ•ด๊ฒฐํ–ˆ์„๊นŒ?

CompletableFuture.supplyAsync(() -> {
    if (1 == 1) throw new RuntimeException();
    log.info("supplyAsync");
    return "Hello";
}).thenApplyAsync(s -> {
    log.info("thenApplyAsync : {}", s);
    return s + " world";
}).thenAcceptAsync(s -> {
    log.info("thenAccept: {}", s);
}).exceptionally(e -> {
    log.error("error handling :{} ", e);
    return null;
});

supplyAsync, thenApplyAsync, thenAcceptAsync ๋ฉ”์„œ๋“œ ์ค‘์—์„œ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด exceptionally() ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด์„œ ์—๋Ÿฌ๋ฅผ ํ•ธ๋“ค๋งํ•  ์ˆ˜ ์žˆ๋‹ค.

์—ฐ๊ด€ ํฌ์ŠคํŠธ