Haladó Olvasási idő: ~8 perc

Concurrency API

ExecutorService, Future, CompletableFuture és ForkJoinPool

Concurrency API

Definíció

A Java Concurrency API a task-alapú konkurencia szabványos eszköztára. Ahelyett, hogy közvetlenül nyers threadeket hoznál létre és kezelnél, munkákat küldesz executoroknak, lifecycle-t kezelő fogantyúkat kapsz vissza, például Future vagy CompletableFuture formájában, és speciális poolokra, például ForkJoinPool-ra támaszkodsz az ütemezési stratégia szempontjából. Production nyelvre lefordítva ez azt jelenti, hogy szétválasztod a feladat beadását, a végrehajtási stratégiát, az eredmény továbbítását, a cancellationt és a shutdown folyamatot.

Pont ez a szétválasztás az igazi érték. Egy nyers thread a hívót rögtön hozzáköti egy konkrét végrehajtási erőforráshoz. Egy executorral viszont kapacitásról, sorokról, backpressure-ről és ownershipről lehet gondolkodni. Valós terhelés alatt ezek ugyanannyira correctness kérdések, mint maga az üzleti logika.

Alapfogalmak

Az Executor a minimális absztrakció: „futtasd ezt a feladatot valahogy”. Az ExecutorService ehhez hozzáad lifecycle műveleteket, tömeges beadást, Future eredményeket és shutdown metódusokat. A gyakorlatban azonban nem az a fő döntés, hogy melyik convenience factoryt választod, hanem az, hogy milyen queueing és sizing viselkedést fogadsz el. A fix méretű pool unbounded queue-val stabil threadszámot adhat, miközben a terhelési csúcsot növekvő latencyként és memóriaként rejti el. A cached pool csökkenti a sorban állást, de burst esetén túl sok threadet hozhat létre.

A Future egy függőben lévő eredményt reprezentál. Lehetővé teszi a blokkoló get() hívást, a cancel műveletet és az állapotlekérdezést. A gyengéje a kompozíció: több future összeláncolása gyorsan nested blockingba vagy callback boilerplate-be csúszik. Ezt oldja meg a CompletableFuture, amely aszinkron stage-eket modellez, és képes transzformálni, kombinálni, hibaágat kezelni vagy versenyeztetni eredményeket. Itt a gondolkodásmód már nem az, hogy „várj egy válaszra”, hanem az, hogy „írj le egy függő munkafolyamatot”.

A ForkJoinPool rekurzív feldarabolásra és finomszemcsés taskokra készült work-stealinggel. Minden worker saját deque-t tart, és az üres worker másoktól lop munkát ahelyett, hogy központi sorra várna. Ez CPU-intenzív divide-and-conquer algoritmusoknál hatékony, de blokkoló I/O mellett veszélyes lehet, ha nem érted a kompenzáció és a managedBlock működését. A common poolra küldött blokkoló HTTP vagy adatbázis hívások gyakran starvationhöz vezetnek.

Az API memória-modell garanciákat is hordoz. A task beadása egy executorba, egy Future befejeződése vagy egy CompletableFuture függő stage-ének lezárása mind létrehozhat hasznos publication edge-eket. Ez azonban nem teszi automatikusan szálbiztossá a task belsejében lévő shared state-et. Az executor csak ütemez; a thread safety továbbra is a te felelősséged.

Gyakorlati használat

Ha az operációs viselkedés számít, jobb az explicit poolépítés, mint a convenience factoryk vak használata. Az Executors.newFixedThreadPool() példákban rendben van, de productionben a ThreadPoolExecutor explicit core size, max size, queue típus, thread factory és rejection handler mellett sokkal jobban átlátható. A thread naming és a rejection policy nem apró részletek, hanem a komponens szerződésének részei.

Az executor ownershipöt komolyan kell venni. Ha egy komponens létrehoz egy poolt, jellemzően neki is kell leállítania. Ha a pool megosztott infrastruktúra, a komponens ne zárja le véletlenül. Shutdown során előbb shutdown(), majd awaitTermination(), és csak szükség esetén shutdownNow(). Ha kihagyod a shutdown lépést, thread leak lesz; ha reflexből shutdownNow()-t hívsz, félbehagyhatsz érzékeny munkát.

A CompletableFuture legyen orkhesztrációs eszköz, ne engedély arra, hogy mindent aszinkronná tegyél. Minden stage-nek kell végrehajtási kontextus. Ha nem adsz meg executort, az async variánsok gyakran a common poolt használják, amelybe így különböző workloadok keveredhetnek. Latency-érzékeny rendszereknél a blokkoló vagy üzletileg kritikus stage-ekhez adj dedikált executort, hogy a pool-interferencia látható és kontrollálható maradjon.

Kód példák

ThreadFactory factory = runnable -> {
    Thread t = new Thread(runnable);
    t.setName("pricing-worker-" + THREAD_ID.incrementAndGet());
    t.setDaemon(false);
    return t;
};

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    8,
    16,
    60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(500),
    factory,
    new ThreadPoolExecutor.CallerRunsPolicy()
);

Future<Price> future = executor.submit(() -> pricingService.calculate(order));
Price price = future.get(200, TimeUnit.MILLISECONDS);

Itt minden operációs döntés explicit: bounded queue, bounded max threadszám, névvel ellátott workerek és olyan rejection policy, amely visszanyomja a terhelést a hívóra ahelyett, hogy végtelenül pufferezne.

CompletableFuture<Customer> customerFuture = CompletableFuture
    .supplyAsync(() -> customerClient.fetch(customerId), ioExecutor);

CompletableFuture<Balance> balanceFuture = CompletableFuture
    .supplyAsync(() -> accountClient.fetchBalance(customerId), ioExecutor);

CompletableFuture<Summary> summaryFuture = customerFuture.thenCombine(
    balanceFuture,
    Summary::new
).orTimeout(300, TimeUnit.MILLISECONDS)
 .exceptionally(ex -> Summary.fallback(customerId));

Summary summary = summaryFuture.join();

A lényeg a strukturált kompozíció: két független I/O egyszerre indul, az eredményük kombinálódik, a timeout és a fallback pedig a pipeline része, nem pedig széttördelt imperatív hibakezelés.

class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10_000;
    private final long[] values;
    private final int start;
    private final int end;

    protected Long compute() {
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) sum += values[i];
            return sum;
        }
        int mid = (start + end) >>> 1;
        SumTask left = new SumTask(values, start, mid);
        SumTask right = new SumTask(values, mid, end);
        left.fork();
        long rightResult = right.compute();
        return rightResult + left.join();
    }
}

Ez a klasszikus fork/join minta: az egyik ágat forkolod, a másikat lokálisan számolod, végül joinolsz. Így kisebb a task overhead és jobban kihasználható a work stealing.

Trade-offok

Az executor-alapú konkurencia javítja az újrafelhasználást, az observabilityt és a kontrollt, de olyan queueing szemantikát is behoz, amely rossz választás esetén elrejtheti az overloadot. Az unbounded queue stabil threadszámot ad, miközben a latencyt és a memóriát növeli észrevétlenül. A bounded queue és a rejection explicitté teszi a hibát, viszont feljebb tolja a felelősséget. Nincs ingyenes választás; minden pool egy load-shedding policyt kódol, akár kimondod, akár nem.

A Future egyszerű és jó egy-egy task eredményére, főleg ha amúgy is van blokkoló boundary. A CompletableFuture kifejezőbb, de a hosszú láncok nehezebben debuggolhatók, és a kivételterjedés szabályait érteni kell. A ForkJoinPool kiváló CPU-bound rekurzív munkára, de gyenge választás tetszőleges blokkoló feladatra, ha nem érted a worker-modellt.

A fő trade-off architekturális: a task API-k leválasztják a beadást a végrehajtásról, ami erős eszköz, de könnyű elveszíteni az ownershipöt. Ha senki nem tudja, ki birtokolja a poolt, ki monitorozza a telítettséget, és ki definiálja a cancellation szemantikát, akkor a kód konkurens, de a rendszer operatívan kaotikus.

Gyakori hibák

Visszatérő hiba az Executors factoryk vak használata. Például a fixed thread pool unbounded LinkedBlockingQueue-t használ, ezért az overload növekvő queue méretként és latencyként jelenik meg azonnali rejection helyett. Ugyancsak gyakori, hogy a tesztekben, háttérszolgáltatásokban vagy CLI utilitykben létrehozott executorokat nem állítják le, emiatt non-daemon threadek életben tartják a JVM-et.

A Future esetén sok fejlesztő túl korán blokkol. Ha minden submit után rögtön future.get() következik, akkor kifizetted a concurrency komplexitását, de throughputot nem nyertél. A cancel szemantikát is sokan félreértik: a future.cancel(true) interruptot kér, de a tasknak együttműködően kell leállnia, különben a cancellation csak papíron történik meg.

A CompletableFuture legnagyobb csapdája a common pool véletlen használata blokkoló munkához. Egy másik buktató a join() és a get() keverése az exception modell ismerete nélkül: a join() CompletionException-be csomagol, míg a get() checked wrapper kivételeket dob. ForkJoinPool esetén worker I/O-n vagy hosszú lockvárakozáson történő blokkolása starvationt okozhat, mert ez a scheduler alapvetően CPU munkára van optimalizálva.

Senior szintű meglátások

Senior szemmel a pool erőforrás-governance eszköz, nem convenience wrapper. A poolméret egy állítás az elérhető párhuzamosságról, a queue mérete egy állítás a tolerált latencyről és memóriapufferelésről, a rejection policy pedig egy állítás a hibamódról. Ha ezek a számok nincsenek SLA-hoz, downstream kapacitáshoz és mért workload formához kötve, akkor csak találgatások.

A CompletableFuture kompozíció tükrözze a domain boundary-kat. A független távoli hívások párhuzamosíthatók, a függő stage-eknek pedig a kauzalitást és a timeout budgetet kell megőrizniük. Ha minden alapértelmezés szerint async lesz, romlik a trace-elhetőség és a failure attribution. A jó tervek kevés, explicit és jól mérhető aszinkron határt tartalmaznak.

A ForkJoinPool common pool processz-szintű megosztott infrastruktúra. Úgy kezeld, mint egy közös adatbázis connection poolt: ne önts bele ismeretlen workloadot gondolkodás nélkül. Blokkoló munka menjen dedikált executorra. CPU-bound divide-and-conquer menjen fork/joinra. A legtöbb production tanulság itt nem a szintaxisról, hanem a workload forma és a scheduler design összeillesztéséről szól.

Szószedet

  • Executor: Minimális absztrakció taskok futtatására.
  • ExecutorService: Lifecycle és eredménykezelést is adó executor.
  • ThreadPoolExecutor: Konfigurálható thread pool implementáció.
  • Future: Függőben lévő eredmény vagy cancellation állapot fogantyúja.
  • CompletableFuture: Komponálható aszinkron stage API.
  • ForkJoinPool: Work-stealing scheduler finomszemcsés CPU taskokra.
  • Rejection policy: Stratégia arra az esetre, amikor a pool nem tud több munkát fogadni.
  • Work stealing: Olyan ütemezés, ahol az üres worker más workertől lop feladatot.

Gyorsreferencia

  • Inkább task beadás, mint kézi thread kezelés.
  • Poolméret, queue, threadnév és rejection policy legyen explicit.
  • Birtokold a lifecycle-t: shutdown(), awaitTermination(), majd szükség esetén eszkaláció.
  • Backpressure esetén használj bounded queue-t.
  • Egyszerű egyedi eredményhez Future, kompozícióhoz CompletableFuture.
  • Ne hívd rögtön minden submit után a future.get()-et, hacsak tényleg blokkolni akarsz.
  • Blokkoló vagy kritikus stage-ekhez adj explicit executort.
  • Tetszőleges blokkoló I/O ne menjen a ForkJoinPool common poolra.
  • Mérd a telítettséget: aktív threadek, queue mélység, rejectionök, task latency.
  • A pool design production szerződés, nem implementációs részlet.

🎮 Játékok

10 kérdés