Spring WebFlux
Mono, Flux, non-blocking IO, reactive streams, backpressure
Spring WebFlux
A Spring WebFlux a Spring reaktív webes keretrendszere, amely non-blocking I/O-ra épül és a Reactive Streams specifikációt valósítja meg Project Reactor segítségével.
1. Definíció
A Spring WebFlux a Spring Framework 5-ben bevezetett reaktív webes modul, amely a Reactive Streams specifikációra épül (Publisher, Subscriber, Subscription, Processor). A megvalósítást a Project Reactor könyvtár biztosítja, két fő típussal: Mono (0 vagy 1 elem) és Flux (0-N elem).
A WebFlux non-blocking I/O modellt használ: kevés event loop thread kezeli az összes kérést, és egyetlen thread sem blokkolódik várakozás közben. Ez lehetővé teszi, hogy kevesebb erőforrással több egyidejű kapcsolatot kezeljen.
Client → Netty Event Loop → Handler → Reactive Pipeline
→ Non-blocking I/O → Response (back-pressure aware)
A WebFlux két programozási modellt támogat:
- Annotáció-alapú:
@RestController+Mono/Flux(ismerős Spring MVC stílus) - Funkcionális:
RouterFunction+HandlerFunction(lambda-alapú routing)
2. Alapfogalmak
Mono és Flux
| Típus | Elemszám | Analógia |
|---|---|---|
| Mono<T> | 0 vagy 1 | Optional<T> / CompletableFuture<T> |
| Flux<T> | 0-N | Stream<T> / List<T> |
// Mono: egyetlen elem vagy üres
Mono<User> user = userRepository.findById(id);
// Flux: több elem stream-ként
Flux<User> users = userRepository.findAll();
// Mono létrehozás
Mono.just("Hello"); // Egy elem
Mono.empty(); // Üres
Mono.error(new Exception()); // Hiba
// Flux létrehozás
Flux.just("A", "B", "C"); // Fix elemek
Flux.fromIterable(list); // Kollekcióból
Flux.interval(Duration.ofSeconds(1)); // Végtelen időzített
Backpressure (ellennyomás)
A backpressure a Reactive Streams alapmechanizmusa: a Subscriber jelzi a Publisher-nek, hogy mennyi elemet tud feldolgozni. Ez megakadályozza a memória túlcsordulást gyors producer és lassú consumer esetén.
Flux.range(1, 1_000_000)
.onBackpressureBuffer(256) // Buffer 256 elemig
.onBackpressureDrop() // Eldobja a felesleget
.onBackpressureLatest() // Csak a legutolsót tartja
.subscribe(item -> process(item));
Non-blocking I/O
A hagyományos Spring MVC thread-per-request modellel szemben a WebFlux event loop modellt használ:
- Netty: alapértelmezett szerver (nem servlet-alapú)
- Event loop thread-ek: tipikusan CPU mag × 2
- Nem szabad blokkolni:
Thread.sleep(), JDBC,synchronized→ tilos! - Blokkoló hívás →
Schedulers.boundedElastic()wrapperre kell tenni
3. Gyakorlati használat
Annotáció-alapú controller
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserRepository userRepository;
public UserController(UserRepository userRepository) {
this.userRepository = userRepository;
}
@GetMapping
public Flux<UserDto> findAll() {
return userRepository.findAll()
.map(UserDto::from);
}
@GetMapping("/{id}")
public Mono<ResponseEntity<UserDto>> findById(@PathVariable String id) {
return userRepository.findById(id)
.map(user -> ResponseEntity.ok(UserDto.from(user)))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<UserDto> create(@Valid @RequestBody Mono<CreateUserRequest> request) {
return request
.map(req -> new User(req.name(), req.email()))
.flatMap(userRepository::save)
.map(UserDto::from);
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> delete(@PathVariable String id) {
return userRepository.deleteById(id);
}
}
Funkcionális routing (RouterFunction)
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.GET("/api/users", handler::findAll)
.GET("/api/users/{id}", handler::findById)
.POST("/api/users", handler::create)
.DELETE("/api/users/{id}", handler::delete)
.build();
}
}
@Component
public class UserHandler {
private final UserRepository userRepository;
public UserHandler(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Mono<ServerResponse> findAll(ServerRequest request) {
return ServerResponse.ok()
.body(userRepository.findAll(), User.class);
}
public Mono<ServerResponse> findById(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> create(ServerRequest request) {
return request.bodyToMono(CreateUserRequest.class)
.map(req -> new User(req.name(), req.email()))
.flatMap(userRepository::save)
.flatMap(user -> ServerResponse
.created(URI.create("/api/users/" + user.getId()))
.bodyValue(user));
}
public Mono<ServerResponse> delete(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.deleteById(id)
.then(ServerResponse.noContent().build());
}
}
WebClient (reaktív HTTP kliens)
@Service
public class ExternalApiService {
private final WebClient webClient;
public ExternalApiService(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_JSON_VALUE)
.build();
}
public Mono<ProductDto> getProduct(String id) {
return webClient.get()
.uri("/products/{id}", id)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError,
resp -> Mono.error(new NotFoundException("Not found")))
.bodyToMono(ProductDto.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
}
public Flux<ProductDto> getAllProducts() {
return webClient.get()
.uri("/products")
.retrieve()
.bodyToFlux(ProductDto.class);
}
}
4. Kód példák
Operátor láncolás
public Mono<OrderSummary> processOrder(String orderId) {
return orderRepository.findById(orderId) // Mono<Order>
.switchIfEmpty(Mono.error(
new NotFoundException("Order not found")))
.flatMap(order -> {
Mono<User> user = userService.findById(order.getUserId());
Mono<List<Product>> products =
productService.findByIds(order.getProductIds())
.collectList();
return Mono.zip(Mono.just(order), user, products);
})
.map(tuple -> new OrderSummary(
tuple.getT1(), tuple.getT2(), tuple.getT3()))
.doOnSuccess(s -> log.info("Order processed: {}", s.id()))
.doOnError(e -> log.error("Order failed", e));
}
Server-Sent Events (SSE)
@GetMapping(value = "/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamPrices() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<StockPrice>builder()
.id(String.valueOf(seq))
.event("price-update")
.data(stockService.getCurrentPrice())
.build());
}
Blokkoló hívás wrappelése
// ROSSZ: blokkoló JDBC hívás event loop thread-en
public Mono<User> findUser(Long id) {
User user = jdbcTemplate.queryForObject(...); // BLOKKOLÓ!
return Mono.just(user);
}
// JÓ: Schedulers.boundedElastic()-re delegálás
public Mono<User> findUser(Long id) {
return Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))
.subscribeOn(Schedulers.boundedElastic());
}
Hibakezelés operátorokkal
public Mono<UserDto> findUser(String id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(
new NotFoundException("User not found: " + id)))
.onErrorResume(TimeoutException.class,
e -> Mono.just(UserDto.fallback()))
.onErrorMap(DatabaseException.class,
e -> new ServiceException("DB error", e))
.doOnError(e -> log.error("Error finding user", e));
}
5. Trade-offok
WebFlux vs Spring MVC
| Szempont | WebFlux | Spring MVC |
|---|---|---|
| I/O modell | Non-blocking (event loop) | Blocking (thread-per-request) |
| Szerver | Netty (alapért.) / Undertow | Tomcat (alapért.) / Jetty |
| Adatbázis | R2DBC, MongoDB Reactive | JDBC, JPA/Hibernate |
| HTTP kliens | WebClient | RestTemplate / RestClient |
| Thread-ek | Kevés (CPU × 2) | Sok (~200 alapért.) |
| Skálázás | Horizontális, kevesebb RAM | Vertikális, több thread |
| Debugolás | Nehéz (reaktív lánc) | Egyszerű (stack trace) |
| Tanulási görbe | Magas | Alacsony |
| Tesztelés | StepVerifier | MockMvc |
Mikor WebFlux
- Magas párhuzamosság (10 000+ egyidejű kapcsolat)
- Streaming (SSE, WebSocket, Kafka)
- Microservice gateway / API gateway
- Non-blocking adatbázis (MongoDB, R2DBC)
Mikor NEM WebFlux
- CRUD alkalmazás hagyományos JDBC/JPA-val
- Csapat nem ismeri a reaktív programozást
- Blokkoló third-party könyvtárak dominálnak
- Egyszerű REST API kevés párhuzamos kéréssel
6. Gyakori hibák
❌ Blokkoló hívás event loop thread-en
// ROSSZ: JDBC blokkolja az event loop-ot
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
User user = jdbcTemplate.queryForObject(...); // TILOS!
return Mono.just(user);
}
// JÓ: boundedElastic-re delegálás
public Mono<User> getUser(Long id) {
return Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))
.subscribeOn(Schedulers.boundedElastic());
}
❌ subscribe() hívás a controller-ben
// ROSSZ: manuális subscribe → nincs backpressure, nincs lifecycle
@PostMapping
public void create(@RequestBody CreateUserRequest req) {
userService.create(req).subscribe(); // NE TEDD!
}
// JÓ: visszaadni a Mono-t a framework-nek
@PostMapping
public Mono<UserDto> create(@RequestBody CreateUserRequest req) {
return userService.create(req);
}
❌ Mono.block() használata
// ROSSZ: block() blokkolja a thread-et
User user = userRepository.findById(id).block(); // TILOS reactive-ban!
// JÓ: flatMap-pel láncolás
return userRepository.findById(id)
.flatMap(user -> processUser(user));
❌ Hibás hibakezelés (try-catch)
// ROSSZ: imperatív try-catch nem működik reaktív láncban
try {
return userService.findById(id);
} catch (Exception e) {
return Mono.empty(); // Sosem hívódik!
}
// JÓ: onErrorResume operátor
return userService.findById(id)
.onErrorResume(e -> Mono.empty());
7. Mélyebb összefüggések
Event loop modell (Netty)
A Netty event loop ciklusa:
- Accept: új kapcsolat elfogadása
- Read: adat olvasása (non-blocking)
- Decode: bejövő adat feldolgozása
- Process: handler meghívása (pipeline)
- Encode: kimenő adat serializálása
- Write: adat küldése (non-blocking)
- Flush: buffer kiírása
Az event loop thread-ek száma tipikusan: Runtime.getRuntime().availableProcessors() × 2.
Kritikus szabály: egyetlen event loop thread-et sem szabad blokkolni, mert az az összes ezen a thread-en lévő kapcsolatot blokkolja.
Scheduler típusok
| Scheduler | Használat |
|---|---|
Schedulers.immediate() |
Aktuális thread (alapértelmezett) |
Schedulers.single() |
Egyetlen újrahasznált thread |
Schedulers.parallel() |
CPU-intenzív munka (CPU mag számú thread) |
Schedulers.boundedElastic() |
Blokkoló I/O wrappelés (bounded pool) |
Schedulers.fromExecutor() |
Egyedi ExecutorService |
Hot vs Cold publisher
// Cold: minden subscriber-nek újraindul
Flux<Integer> cold = Flux.range(1, 3);
cold.subscribe(i -> log.info("Sub1: {}", i)); // 1, 2, 3
cold.subscribe(i -> log.info("Sub2: {}", i)); // 1, 2, 3
// Hot: valós idejű, megosztott
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hot = sink.asFlux();
hot.subscribe(s -> log.info("Sub1: {}", s));
sink.tryEmitNext("A"); // Sub1: A
hot.subscribe(s -> log.info("Sub2: {}", s));
sink.tryEmitNext("B"); // Sub1: B, Sub2: B
Tesztelés StepVerifier-rel
@Test
void shouldReturnAllUsers() {
Flux<User> users = userRepository.findAll();
StepVerifier.create(users)
.expectNextCount(3)
.verifyComplete();
}
@Test
void shouldHandleError() {
Mono<User> error = userService.findById("invalid");
StepVerifier.create(error)
.expectError(NotFoundException.class)
.verify();
}
8. Interjúkérdések
Mi a különbség a Mono és a Flux között? Mono: 0 vagy 1 elem (egyetlen érték vagy üres). Flux: 0-N elem (stream). Mindkettő Publisher implementáció (Reactive Streams).
Mi a backpressure és miért fontos? A Subscriber jelzi a Publisher-nek, mennyi elemet tud feldolgozni. Megakadályozza a memória túlcsordulást gyors producer és lassú consumer esetén.
Miért tilos blokkoló hívást tenni event loop thread-en? Az event loop thread az összes rá beosztott kapcsolatot kezeli. Ha blokkolódik, minden kapcsolat megáll. Blokkoló hívás →
Schedulers.boundedElastic().Mi a különbség a map() és flatMap() között?
map(): szinkron transzformáció (T → R).flatMap(): aszinkron transzformáció (T → Mono<R>/Flux<R>). FlatMap-et használunk, ha a transzformáció maga is reaktív.Mi a különbség a hot és cold Publisher között? Cold: minden subscriber-nek újraindul (pl. DB lekérdezés). Hot: valós idejű, megosztott (pl. WebSocket, SSE).
Hogyan tesztelsz reaktív kódot?
StepVerifier.create(publisher).expectNext(...).verifyComplete(). A StepVerifier ellenőrzi az elemeket, hibákat és a befejezést.Mikor használj WebFlux-ot Spring MVC helyett? Magas párhuzamosság (10k+ kapcsolat), streaming, non-blocking DB (R2DBC), microservice gateway. NEM: hagyományos JDBC/JPA, blokkoló könyvtárak.
9. Szószedet
| Fogalom | Jelentés |
|---|---|
| Mono<T> | 0 vagy 1 elemet tartalmazó reaktív publisher |
| Flux<T> | 0-N elemet tartalmazó reaktív publisher |
| Backpressure | Subscriber jelzi a Publisher-nek a feldolgozási kapacitást |
| Event loop | Non-blocking I/O szál, kapcsolatok multiplexelése |
| Reactive Streams | Publisher/Subscriber specifikáció (java.util.concurrent.Flow) |
| Project Reactor | Reactive Streams implementáció (Mono, Flux) |
| WebClient | Reaktív HTTP kliens (RestTemplate helyett) |
| R2DBC | Reactive Relational Database Connectivity |
| StepVerifier | Reaktív stream tesztelő eszköz |
| RouterFunction | Funkcionális routing (lambda-alapú) |
| Schedulers | Thread pool kezelés reaktív kontextusban |
| SSE | Server-Sent Events (egyirányú szerver → kliens stream) |
10. Gyorsreferencia
TÍPUSOK:
Mono<T> 0 vagy 1 elem
Flux<T> 0-N elem
Mono.just(x) Egy elem
Mono.empty() Üres
Flux.fromIterable(list) Kollekcióból
OPERÁTOROK:
.map(T -> R) Szinkron transzformáció
.flatMap(T -> Mono<R>) Aszinkron transzformáció
.filter(predicate) Szűrés
.switchIfEmpty(alt) Fallback ha üres
.zip(mono1, mono2) Párhuzamos összevárás
.collectList() Flux → Mono<List>
.doOnNext/Error/Success Mellékhatás (logging)
HIBAKEZELÉS:
.onErrorResume(e -> alt) Fallback publisher
.onErrorMap(e -> newE) Exception transzformáció
.retry(3) Újrapróbálás
.timeout(Duration) Időtúllépés
SCHEDULER:
Schedulers.parallel() CPU-intenzív
Schedulers.boundedElastic() Blokkoló I/O wrappelés
.subscribeOn(scheduler) Feliratkozás thread
TESZTELÉS:
StepVerifier.create(pub)
.expectNext(x)
.expectError(E.class)
.verifyComplete()
🎮 Játékok
10 kérdés