Haladó Olvasási idő: ~9 perc

Spring WebFlux

Mono, Flux, non-blocking IO, reactive streams, backpressure

Spring WebFlux

A Spring WebFlux a Spring reaktív webes keretrendszere, amely non-blocking I/O-ra épül és a Reactive Streams specifikációt valósítja meg Project Reactor segítségével.


1. Definíció

A Spring WebFlux a Spring Framework 5-ben bevezetett reaktív webes modul, amely a Reactive Streams specifikációra épül (Publisher, Subscriber, Subscription, Processor). A megvalósítást a Project Reactor könyvtár biztosítja, két fő típussal: Mono (0 vagy 1 elem) és Flux (0-N elem).

A WebFlux non-blocking I/O modellt használ: kevés event loop thread kezeli az összes kérést, és egyetlen thread sem blokkolódik várakozás közben. Ez lehetővé teszi, hogy kevesebb erőforrással több egyidejű kapcsolatot kezeljen.

Client → Netty Event Loop → Handler → Reactive Pipeline
    → Non-blocking I/O → Response (back-pressure aware)

A WebFlux két programozási modellt támogat:

  • Annotáció-alapú: @RestController + Mono/Flux (ismerős Spring MVC stílus)
  • Funkcionális: RouterFunction + HandlerFunction (lambda-alapú routing)

2. Alapfogalmak

Mono és Flux

Típus Elemszám Analógia
Mono<T> 0 vagy 1 Optional<T> / CompletableFuture<T>
Flux<T> 0-N Stream<T> / List<T>
// Mono: egyetlen elem vagy üres
Mono<User> user = userRepository.findById(id);

// Flux: több elem stream-ként
Flux<User> users = userRepository.findAll();

// Mono létrehozás
Mono.just("Hello");          // Egy elem
Mono.empty();                // Üres
Mono.error(new Exception()); // Hiba

// Flux létrehozás
Flux.just("A", "B", "C");           // Fix elemek
Flux.fromIterable(list);            // Kollekcióból
Flux.interval(Duration.ofSeconds(1)); // Végtelen időzített

Backpressure (ellennyomás)

A backpressure a Reactive Streams alapmechanizmusa: a Subscriber jelzi a Publisher-nek, hogy mennyi elemet tud feldolgozni. Ez megakadályozza a memória túlcsordulást gyors producer és lassú consumer esetén.

Flux.range(1, 1_000_000)
    .onBackpressureBuffer(256)    // Buffer 256 elemig
    .onBackpressureDrop()         // Eldobja a felesleget
    .onBackpressureLatest()       // Csak a legutolsót tartja
    .subscribe(item -> process(item));

Non-blocking I/O

A hagyományos Spring MVC thread-per-request modellel szemben a WebFlux event loop modellt használ:

  • Netty: alapértelmezett szerver (nem servlet-alapú)
  • Event loop thread-ek: tipikusan CPU mag × 2
  • Nem szabad blokkolni: Thread.sleep(), JDBC, synchronized → tilos!
  • Blokkoló hívás → Schedulers.boundedElastic() wrapperre kell tenni

3. Gyakorlati használat

Annotáció-alapú controller

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserRepository userRepository;

    public UserController(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    @GetMapping
    public Flux<UserDto> findAll() {
        return userRepository.findAll()
                .map(UserDto::from);
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserDto>> findById(@PathVariable String id) {
        return userRepository.findById(id)
                .map(user -> ResponseEntity.ok(UserDto.from(user)))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserDto> create(@Valid @RequestBody Mono<CreateUserRequest> request) {
        return request
                .map(req -> new User(req.name(), req.email()))
                .flatMap(userRepository::save)
                .map(UserDto::from);
    }

    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> delete(@PathVariable String id) {
        return userRepository.deleteById(id);
    }
}

Funkcionális routing (RouterFunction)

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions.route()
                .GET("/api/users", handler::findAll)
                .GET("/api/users/{id}", handler::findById)
                .POST("/api/users", handler::create)
                .DELETE("/api/users/{id}", handler::delete)
                .build();
    }
}

@Component
public class UserHandler {

    private final UserRepository userRepository;

    public UserHandler(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Mono<ServerResponse> findAll(ServerRequest request) {
        return ServerResponse.ok()
                .body(userRepository.findAll(), User.class);
    }

    public Mono<ServerResponse> findById(ServerRequest request) {
        String id = request.pathVariable("id");
        return userRepository.findById(id)
                .flatMap(user -> ServerResponse.ok().bodyValue(user))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> create(ServerRequest request) {
        return request.bodyToMono(CreateUserRequest.class)
                .map(req -> new User(req.name(), req.email()))
                .flatMap(userRepository::save)
                .flatMap(user -> ServerResponse
                        .created(URI.create("/api/users/" + user.getId()))
                        .bodyValue(user));
    }

    public Mono<ServerResponse> delete(ServerRequest request) {
        String id = request.pathVariable("id");
        return userRepository.deleteById(id)
                .then(ServerResponse.noContent().build());
    }
}

WebClient (reaktív HTTP kliens)

@Service
public class ExternalApiService {

    private final WebClient webClient;

    public ExternalApiService(WebClient.Builder builder) {
        this.webClient = builder
                .baseUrl("https://api.example.com")
                .defaultHeader(HttpHeaders.CONTENT_TYPE,
                        MediaType.APPLICATION_JSON_VALUE)
                .build();
    }

    public Mono<ProductDto> getProduct(String id) {
        return webClient.get()
                .uri("/products/{id}", id)
                .retrieve()
                .onStatus(HttpStatusCode::is4xxClientError,
                        resp -> Mono.error(new NotFoundException("Not found")))
                .bodyToMono(ProductDto.class)
                .timeout(Duration.ofSeconds(5))
                .retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
    }

    public Flux<ProductDto> getAllProducts() {
        return webClient.get()
                .uri("/products")
                .retrieve()
                .bodyToFlux(ProductDto.class);
    }
}

4. Kód példák

Operátor láncolás

public Mono<OrderSummary> processOrder(String orderId) {
    return orderRepository.findById(orderId)           // Mono<Order>
            .switchIfEmpty(Mono.error(
                    new NotFoundException("Order not found")))
            .flatMap(order -> {
                Mono<User> user = userService.findById(order.getUserId());
                Mono<List<Product>> products =
                        productService.findByIds(order.getProductIds())
                                .collectList();
                return Mono.zip(Mono.just(order), user, products);
            })
            .map(tuple -> new OrderSummary(
                    tuple.getT1(), tuple.getT2(), tuple.getT3()))
            .doOnSuccess(s -> log.info("Order processed: {}", s.id()))
            .doOnError(e -> log.error("Order failed", e));
}

Server-Sent Events (SSE)

@GetMapping(value = "/stream",
        produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamPrices() {
    return Flux.interval(Duration.ofSeconds(1))
            .map(seq -> ServerSentEvent.<StockPrice>builder()
                    .id(String.valueOf(seq))
                    .event("price-update")
                    .data(stockService.getCurrentPrice())
                    .build());
}

Blokkoló hívás wrappelése

// ROSSZ: blokkoló JDBC hívás event loop thread-en
public Mono<User> findUser(Long id) {
    User user = jdbcTemplate.queryForObject(...); // BLOKKOLÓ!
    return Mono.just(user);
}

// JÓ: Schedulers.boundedElastic()-re delegálás
public Mono<User> findUser(Long id) {
    return Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))
            .subscribeOn(Schedulers.boundedElastic());
}

Hibakezelés operátorokkal

public Mono<UserDto> findUser(String id) {
    return userRepository.findById(id)
            .switchIfEmpty(Mono.error(
                    new NotFoundException("User not found: " + id)))
            .onErrorResume(TimeoutException.class,
                    e -> Mono.just(UserDto.fallback()))
            .onErrorMap(DatabaseException.class,
                    e -> new ServiceException("DB error", e))
            .doOnError(e -> log.error("Error finding user", e));
}

5. Trade-offok

WebFlux vs Spring MVC

Szempont WebFlux Spring MVC
I/O modell Non-blocking (event loop) Blocking (thread-per-request)
Szerver Netty (alapért.) / Undertow Tomcat (alapért.) / Jetty
Adatbázis R2DBC, MongoDB Reactive JDBC, JPA/Hibernate
HTTP kliens WebClient RestTemplate / RestClient
Thread-ek Kevés (CPU × 2) Sok (~200 alapért.)
Skálázás Horizontális, kevesebb RAM Vertikális, több thread
Debugolás Nehéz (reaktív lánc) Egyszerű (stack trace)
Tanulási görbe Magas Alacsony
Tesztelés StepVerifier MockMvc

Mikor WebFlux

  • Magas párhuzamosság (10 000+ egyidejű kapcsolat)
  • Streaming (SSE, WebSocket, Kafka)
  • Microservice gateway / API gateway
  • Non-blocking adatbázis (MongoDB, R2DBC)

Mikor NEM WebFlux

  • CRUD alkalmazás hagyományos JDBC/JPA-val
  • Csapat nem ismeri a reaktív programozást
  • Blokkoló third-party könyvtárak dominálnak
  • Egyszerű REST API kevés párhuzamos kéréssel

6. Gyakori hibák

❌ Blokkoló hívás event loop thread-en

// ROSSZ: JDBC blokkolja az event loop-ot
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
    User user = jdbcTemplate.queryForObject(...); // TILOS!
    return Mono.just(user);
}

// JÓ: boundedElastic-re delegálás
public Mono<User> getUser(Long id) {
    return Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))
            .subscribeOn(Schedulers.boundedElastic());
}

❌ subscribe() hívás a controller-ben

// ROSSZ: manuális subscribe → nincs backpressure, nincs lifecycle
@PostMapping
public void create(@RequestBody CreateUserRequest req) {
    userService.create(req).subscribe(); // NE TEDD!
}

// JÓ: visszaadni a Mono-t a framework-nek
@PostMapping
public Mono<UserDto> create(@RequestBody CreateUserRequest req) {
    return userService.create(req);
}

❌ Mono.block() használata

// ROSSZ: block() blokkolja a thread-et
User user = userRepository.findById(id).block(); // TILOS reactive-ban!

// JÓ: flatMap-pel láncolás
return userRepository.findById(id)
        .flatMap(user -> processUser(user));

❌ Hibás hibakezelés (try-catch)

// ROSSZ: imperatív try-catch nem működik reaktív láncban
try {
    return userService.findById(id);
} catch (Exception e) {
    return Mono.empty(); // Sosem hívódik!
}

// JÓ: onErrorResume operátor
return userService.findById(id)
        .onErrorResume(e -> Mono.empty());

7. Mélyebb összefüggések

Event loop modell (Netty)

A Netty event loop ciklusa:

  1. Accept: új kapcsolat elfogadása
  2. Read: adat olvasása (non-blocking)
  3. Decode: bejövő adat feldolgozása
  4. Process: handler meghívása (pipeline)
  5. Encode: kimenő adat serializálása
  6. Write: adat küldése (non-blocking)
  7. Flush: buffer kiírása

Az event loop thread-ek száma tipikusan: Runtime.getRuntime().availableProcessors() × 2.

Kritikus szabály: egyetlen event loop thread-et sem szabad blokkolni, mert az az összes ezen a thread-en lévő kapcsolatot blokkolja.

Scheduler típusok

Scheduler Használat
Schedulers.immediate() Aktuális thread (alapértelmezett)
Schedulers.single() Egyetlen újrahasznált thread
Schedulers.parallel() CPU-intenzív munka (CPU mag számú thread)
Schedulers.boundedElastic() Blokkoló I/O wrappelés (bounded pool)
Schedulers.fromExecutor() Egyedi ExecutorService

Hot vs Cold publisher

// Cold: minden subscriber-nek újraindul
Flux<Integer> cold = Flux.range(1, 3);
cold.subscribe(i -> log.info("Sub1: {}", i)); // 1, 2, 3
cold.subscribe(i -> log.info("Sub2: {}", i)); // 1, 2, 3

// Hot: valós idejű, megosztott
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hot = sink.asFlux();
hot.subscribe(s -> log.info("Sub1: {}", s));
sink.tryEmitNext("A"); // Sub1: A
hot.subscribe(s -> log.info("Sub2: {}", s));
sink.tryEmitNext("B"); // Sub1: B, Sub2: B

Tesztelés StepVerifier-rel

@Test
void shouldReturnAllUsers() {
    Flux<User> users = userRepository.findAll();

    StepVerifier.create(users)
            .expectNextCount(3)
            .verifyComplete();
}

@Test
void shouldHandleError() {
    Mono<User> error = userService.findById("invalid");

    StepVerifier.create(error)
            .expectError(NotFoundException.class)
            .verify();
}

8. Interjúkérdések

  1. Mi a különbség a Mono és a Flux között? Mono: 0 vagy 1 elem (egyetlen érték vagy üres). Flux: 0-N elem (stream). Mindkettő Publisher implementáció (Reactive Streams).

  2. Mi a backpressure és miért fontos? A Subscriber jelzi a Publisher-nek, mennyi elemet tud feldolgozni. Megakadályozza a memória túlcsordulást gyors producer és lassú consumer esetén.

  3. Miért tilos blokkoló hívást tenni event loop thread-en? Az event loop thread az összes rá beosztott kapcsolatot kezeli. Ha blokkolódik, minden kapcsolat megáll. Blokkoló hívás → Schedulers.boundedElastic().

  4. Mi a különbség a map() és flatMap() között? map(): szinkron transzformáció (T → R). flatMap(): aszinkron transzformáció (T → Mono<R>/Flux<R>). FlatMap-et használunk, ha a transzformáció maga is reaktív.

  5. Mi a különbség a hot és cold Publisher között? Cold: minden subscriber-nek újraindul (pl. DB lekérdezés). Hot: valós idejű, megosztott (pl. WebSocket, SSE).

  6. Hogyan tesztelsz reaktív kódot? StepVerifier.create(publisher).expectNext(...).verifyComplete(). A StepVerifier ellenőrzi az elemeket, hibákat és a befejezést.

  7. Mikor használj WebFlux-ot Spring MVC helyett? Magas párhuzamosság (10k+ kapcsolat), streaming, non-blocking DB (R2DBC), microservice gateway. NEM: hagyományos JDBC/JPA, blokkoló könyvtárak.


9. Szószedet

Fogalom Jelentés
Mono<T> 0 vagy 1 elemet tartalmazó reaktív publisher
Flux<T> 0-N elemet tartalmazó reaktív publisher
Backpressure Subscriber jelzi a Publisher-nek a feldolgozási kapacitást
Event loop Non-blocking I/O szál, kapcsolatok multiplexelése
Reactive Streams Publisher/Subscriber specifikáció (java.util.concurrent.Flow)
Project Reactor Reactive Streams implementáció (Mono, Flux)
WebClient Reaktív HTTP kliens (RestTemplate helyett)
R2DBC Reactive Relational Database Connectivity
StepVerifier Reaktív stream tesztelő eszköz
RouterFunction Funkcionális routing (lambda-alapú)
Schedulers Thread pool kezelés reaktív kontextusban
SSE Server-Sent Events (egyirányú szerver → kliens stream)

10. Gyorsreferencia

TÍPUSOK:
  Mono<T>                  0 vagy 1 elem
  Flux<T>                  0-N elem
  Mono.just(x)             Egy elem
  Mono.empty()             Üres
  Flux.fromIterable(list)  Kollekcióból

OPERÁTOROK:
  .map(T -> R)             Szinkron transzformáció
  .flatMap(T -> Mono<R>)   Aszinkron transzformáció
  .filter(predicate)       Szűrés
  .switchIfEmpty(alt)      Fallback ha üres
  .zip(mono1, mono2)       Párhuzamos összevárás
  .collectList()           Flux → Mono<List>
  .doOnNext/Error/Success  Mellékhatás (logging)

HIBAKEZELÉS:
  .onErrorResume(e -> alt) Fallback publisher
  .onErrorMap(e -> newE)   Exception transzformáció
  .retry(3)                Újrapróbálás
  .timeout(Duration)       Időtúllépés

SCHEDULER:
  Schedulers.parallel()         CPU-intenzív
  Schedulers.boundedElastic()   Blokkoló I/O wrappelés
  .subscribeOn(scheduler)       Feliratkozás thread

TESZTELÉS:
  StepVerifier.create(pub)
      .expectNext(x)
      .expectError(E.class)
      .verifyComplete()

🎮 Játékok

10 kérdés