Concurrency API
ExecutorService, Future, CompletableFuture és ForkJoinPool
Concurrency API
Definíció
A Java Concurrency API a task-alapú konkurencia szabványos eszköztára. Ahelyett, hogy közvetlenül nyers threadeket hoznál létre és kezelnél, munkákat küldesz executoroknak, lifecycle-t kezelő fogantyúkat kapsz vissza, például Future vagy CompletableFuture formájában, és speciális poolokra, például ForkJoinPool-ra támaszkodsz az ütemezési stratégia szempontjából. Production nyelvre lefordítva ez azt jelenti, hogy szétválasztod a feladat beadását, a végrehajtási stratégiát, az eredmény továbbítását, a cancellationt és a shutdown folyamatot.
Pont ez a szétválasztás az igazi érték. Egy nyers thread a hívót rögtön hozzáköti egy konkrét végrehajtási erőforráshoz. Egy executorral viszont kapacitásról, sorokról, backpressure-ről és ownershipről lehet gondolkodni. Valós terhelés alatt ezek ugyanannyira correctness kérdések, mint maga az üzleti logika.
Alapfogalmak
Az Executor a minimális absztrakció: „futtasd ezt a feladatot valahogy”. Az ExecutorService ehhez hozzáad lifecycle műveleteket, tömeges beadást, Future eredményeket és shutdown metódusokat. A gyakorlatban azonban nem az a fő döntés, hogy melyik convenience factoryt választod, hanem az, hogy milyen queueing és sizing viselkedést fogadsz el. A fix méretű pool unbounded queue-val stabil threadszámot adhat, miközben a terhelési csúcsot növekvő latencyként és memóriaként rejti el. A cached pool csökkenti a sorban állást, de burst esetén túl sok threadet hozhat létre.
A Future egy függőben lévő eredményt reprezentál. Lehetővé teszi a blokkoló get() hívást, a cancel műveletet és az állapotlekérdezést. A gyengéje a kompozíció: több future összeláncolása gyorsan nested blockingba vagy callback boilerplate-be csúszik. Ezt oldja meg a CompletableFuture, amely aszinkron stage-eket modellez, és képes transzformálni, kombinálni, hibaágat kezelni vagy versenyeztetni eredményeket. Itt a gondolkodásmód már nem az, hogy „várj egy válaszra”, hanem az, hogy „írj le egy függő munkafolyamatot”.
A ForkJoinPool rekurzív feldarabolásra és finomszemcsés taskokra készült work-stealinggel. Minden worker saját deque-t tart, és az üres worker másoktól lop munkát ahelyett, hogy központi sorra várna. Ez CPU-intenzív divide-and-conquer algoritmusoknál hatékony, de blokkoló I/O mellett veszélyes lehet, ha nem érted a kompenzáció és a managedBlock működését. A common poolra küldött blokkoló HTTP vagy adatbázis hívások gyakran starvationhöz vezetnek.
Az API memória-modell garanciákat is hordoz. A task beadása egy executorba, egy Future befejeződése vagy egy CompletableFuture függő stage-ének lezárása mind létrehozhat hasznos publication edge-eket. Ez azonban nem teszi automatikusan szálbiztossá a task belsejében lévő shared state-et. Az executor csak ütemez; a thread safety továbbra is a te felelősséged.
Gyakorlati használat
Ha az operációs viselkedés számít, jobb az explicit poolépítés, mint a convenience factoryk vak használata. Az Executors.newFixedThreadPool() példákban rendben van, de productionben a ThreadPoolExecutor explicit core size, max size, queue típus, thread factory és rejection handler mellett sokkal jobban átlátható. A thread naming és a rejection policy nem apró részletek, hanem a komponens szerződésének részei.
Az executor ownershipöt komolyan kell venni. Ha egy komponens létrehoz egy poolt, jellemzően neki is kell leállítania. Ha a pool megosztott infrastruktúra, a komponens ne zárja le véletlenül. Shutdown során előbb shutdown(), majd awaitTermination(), és csak szükség esetén shutdownNow(). Ha kihagyod a shutdown lépést, thread leak lesz; ha reflexből shutdownNow()-t hívsz, félbehagyhatsz érzékeny munkát.
A CompletableFuture legyen orkhesztrációs eszköz, ne engedély arra, hogy mindent aszinkronná tegyél. Minden stage-nek kell végrehajtási kontextus. Ha nem adsz meg executort, az async variánsok gyakran a common poolt használják, amelybe így különböző workloadok keveredhetnek. Latency-érzékeny rendszereknél a blokkoló vagy üzletileg kritikus stage-ekhez adj dedikált executort, hogy a pool-interferencia látható és kontrollálható maradjon.
Kód példák
ThreadFactory factory = runnable -> {
Thread t = new Thread(runnable);
t.setName("pricing-worker-" + THREAD_ID.incrementAndGet());
t.setDaemon(false);
return t;
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
8,
16,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500),
factory,
new ThreadPoolExecutor.CallerRunsPolicy()
);
Future<Price> future = executor.submit(() -> pricingService.calculate(order));
Price price = future.get(200, TimeUnit.MILLISECONDS);
Itt minden operációs döntés explicit: bounded queue, bounded max threadszám, névvel ellátott workerek és olyan rejection policy, amely visszanyomja a terhelést a hívóra ahelyett, hogy végtelenül pufferezne.
CompletableFuture<Customer> customerFuture = CompletableFuture
.supplyAsync(() -> customerClient.fetch(customerId), ioExecutor);
CompletableFuture<Balance> balanceFuture = CompletableFuture
.supplyAsync(() -> accountClient.fetchBalance(customerId), ioExecutor);
CompletableFuture<Summary> summaryFuture = customerFuture.thenCombine(
balanceFuture,
Summary::new
).orTimeout(300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> Summary.fallback(customerId));
Summary summary = summaryFuture.join();
A lényeg a strukturált kompozíció: két független I/O egyszerre indul, az eredményük kombinálódik, a timeout és a fallback pedig a pipeline része, nem pedig széttördelt imperatív hibakezelés.
class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10_000;
private final long[] values;
private final int start;
private final int end;
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) sum += values[i];
return sum;
}
int mid = (start + end) >>> 1;
SumTask left = new SumTask(values, start, mid);
SumTask right = new SumTask(values, mid, end);
left.fork();
long rightResult = right.compute();
return rightResult + left.join();
}
}
Ez a klasszikus fork/join minta: az egyik ágat forkolod, a másikat lokálisan számolod, végül joinolsz. Így kisebb a task overhead és jobban kihasználható a work stealing.
Trade-offok
Az executor-alapú konkurencia javítja az újrafelhasználást, az observabilityt és a kontrollt, de olyan queueing szemantikát is behoz, amely rossz választás esetén elrejtheti az overloadot. Az unbounded queue stabil threadszámot ad, miközben a latencyt és a memóriát növeli észrevétlenül. A bounded queue és a rejection explicitté teszi a hibát, viszont feljebb tolja a felelősséget. Nincs ingyenes választás; minden pool egy load-shedding policyt kódol, akár kimondod, akár nem.
A Future egyszerű és jó egy-egy task eredményére, főleg ha amúgy is van blokkoló boundary. A CompletableFuture kifejezőbb, de a hosszú láncok nehezebben debuggolhatók, és a kivételterjedés szabályait érteni kell. A ForkJoinPool kiváló CPU-bound rekurzív munkára, de gyenge választás tetszőleges blokkoló feladatra, ha nem érted a worker-modellt.
A fő trade-off architekturális: a task API-k leválasztják a beadást a végrehajtásról, ami erős eszköz, de könnyű elveszíteni az ownershipöt. Ha senki nem tudja, ki birtokolja a poolt, ki monitorozza a telítettséget, és ki definiálja a cancellation szemantikát, akkor a kód konkurens, de a rendszer operatívan kaotikus.
Gyakori hibák
Visszatérő hiba az Executors factoryk vak használata. Például a fixed thread pool unbounded LinkedBlockingQueue-t használ, ezért az overload növekvő queue méretként és latencyként jelenik meg azonnali rejection helyett. Ugyancsak gyakori, hogy a tesztekben, háttérszolgáltatásokban vagy CLI utilitykben létrehozott executorokat nem állítják le, emiatt non-daemon threadek életben tartják a JVM-et.
A Future esetén sok fejlesztő túl korán blokkol. Ha minden submit után rögtön future.get() következik, akkor kifizetted a concurrency komplexitását, de throughputot nem nyertél. A cancel szemantikát is sokan félreértik: a future.cancel(true) interruptot kér, de a tasknak együttműködően kell leállnia, különben a cancellation csak papíron történik meg.
A CompletableFuture legnagyobb csapdája a common pool véletlen használata blokkoló munkához. Egy másik buktató a join() és a get() keverése az exception modell ismerete nélkül: a join() CompletionException-be csomagol, míg a get() checked wrapper kivételeket dob. ForkJoinPool esetén worker I/O-n vagy hosszú lockvárakozáson történő blokkolása starvationt okozhat, mert ez a scheduler alapvetően CPU munkára van optimalizálva.
Senior szintű meglátások
Senior szemmel a pool erőforrás-governance eszköz, nem convenience wrapper. A poolméret egy állítás az elérhető párhuzamosságról, a queue mérete egy állítás a tolerált latencyről és memóriapufferelésről, a rejection policy pedig egy állítás a hibamódról. Ha ezek a számok nincsenek SLA-hoz, downstream kapacitáshoz és mért workload formához kötve, akkor csak találgatások.
A CompletableFuture kompozíció tükrözze a domain boundary-kat. A független távoli hívások párhuzamosíthatók, a függő stage-eknek pedig a kauzalitást és a timeout budgetet kell megőrizniük. Ha minden alapértelmezés szerint async lesz, romlik a trace-elhetőség és a failure attribution. A jó tervek kevés, explicit és jól mérhető aszinkron határt tartalmaznak.
A ForkJoinPool common pool processz-szintű megosztott infrastruktúra. Úgy kezeld, mint egy közös adatbázis connection poolt: ne önts bele ismeretlen workloadot gondolkodás nélkül. Blokkoló munka menjen dedikált executorra. CPU-bound divide-and-conquer menjen fork/joinra. A legtöbb production tanulság itt nem a szintaxisról, hanem a workload forma és a scheduler design összeillesztéséről szól.
Szószedet
- Executor: Minimális absztrakció taskok futtatására.
- ExecutorService: Lifecycle és eredménykezelést is adó executor.
- ThreadPoolExecutor: Konfigurálható thread pool implementáció.
- Future: Függőben lévő eredmény vagy cancellation állapot fogantyúja.
- CompletableFuture: Komponálható aszinkron stage API.
- ForkJoinPool: Work-stealing scheduler finomszemcsés CPU taskokra.
- Rejection policy: Stratégia arra az esetre, amikor a pool nem tud több munkát fogadni.
- Work stealing: Olyan ütemezés, ahol az üres worker más workertől lop feladatot.
Gyorsreferencia
- Inkább task beadás, mint kézi thread kezelés.
- Poolméret, queue, threadnév és rejection policy legyen explicit.
- Birtokold a lifecycle-t:
shutdown(),awaitTermination(), majd szükség esetén eszkaláció. - Backpressure esetén használj bounded queue-t.
- Egyszerű egyedi eredményhez
Future, kompozícióhozCompletableFuture. - Ne hívd rögtön minden submit után a
future.get()-et, hacsak tényleg blokkolni akarsz. - Blokkoló vagy kritikus stage-ekhez adj explicit executort.
- Tetszőleges blokkoló I/O ne menjen a
ForkJoinPoolcommon poolra. - Mérd a telítettséget: aktív threadek, queue mélység, rejectionök, task latency.
- A pool design production szerződés, nem implementációs részlet.
🎮 Játékok
10 kérdés