This is the first release of Reactor 3.1, part of Bismuth-RELEASE Release Train.
This version is a recommended update for all Reactor 3 users, and will be the one backing Spring Framework 5.0. A lot of API polishing has occurred between the last 3.0.x release and this one, so please read the release notes below carefully if you didn't progressively upgrade through the various MILESTONEs and RCs. (if you only care about what changed since RC1, please have a look at the RC2 release note)
The release notes below are written from the perspective of a migration from 3.0.x latest version and cover
changes across all the 3.1.0 pre-releases.
Note that since pre-release v3.1.0.M2, the source code for the reactor-test artifact is part of the reactor-core github repository.
⚠️ Update considerations and deprecations
-
Reactor has been updated to pass the TCK of the new Reactive Streams specification
1.0.1.RC2. A newStrictSubscriberenforces RS rules more strictly when going out of the Reactor world and bridging to an externalPublisher(#711) -
The
reactor-testartifact is now part of theio.projectreactorgroupId (and reactor-core github repository) -
Kotlin extensions for core and test are now part of the
reactor-corerepo and artifacts -
behavioral changes
-
Flux/Mono method changes
ParallelFlux#subscribe(lambdas)lambda-based variants now return aDisposable(composite of all rails subscribers, #800)Flux#toIterablenow takes anintfor itsbatchSizeargument (f3e13bf)Mono.doOnTerminateanddoAfterTerminatenow take a simpleRunnable(aligning the API withFlux). The olddoOnTerminate(BiConsumer<Throwable, T>)can be achieved with the addeddoOnSuccessOrErrorand olddoAfterTerminate(BiConsumer<Throwable, T>)withdoAfterSuccessOrError(BiConsumer<Throwable, T>). (#836 + 80a3210)Flux.firstEmittingis now calledfirstandFlux.firstEmittingWithis now calledor, aligning APIs with those ofMono(#849)- static
Mono.empty(Publisher<?>)has been removed, prefer using the newMono.when(p)(2e7fdf3) - buffer operators that take a
(Publisher, Function)as arguments (open boundary and closing selector) have been renamed tobufferWhen. This improves the situation where having multiple lambda-based signature can create ambiguity (even more so in languages like Kotlin) (#542) delayinFluxis removed in favor ofFlux.delayElements(see #263)Mono#untilOtherremoved (usedelayUntil(it -> other.take(1))instead, see #558)
-
then:
- variants of
thenthat took aSupplierhave been removed. Usedeferif you really need to provide the continuationMonolazily (#547) Flux.then(Publisher<Void>)has been removed in favor ofthenEmptyMono.then(Function)is nowMono.flatMap
- variants of
-
Mono zip vs when
zipis the operator dedicated to producing Tuples (or equivalent combinations) of the elements from multiple sources.
As a consequence,whenandandvariants that produced aTuplehave now been renamedzipandzipWith(#789 + 52f7f04)whenandandstill exist, as operators in the same family asthen: they only care about completion signals,
discarding elements and returning aMono<Void>.Mono#zipwith an iterable: for consistency, swap the arguments (old "function, iterable" version is deprecated, see #619)zip(Iterable)andzip(Function, Iterable)inFluxhas been removed (#338, #619)
-
⚠️
Mono.then(Function)andMono.flatMap(): how to migrate (#516):- first replace all usage of
Mono.flatMapwithflatMapMany - then replace all usage of
Mono.then(Function)(which has been removed) withflatMap. - the gist of this change is that the old
flatMapwas actually changing the return type toFluxand was thus missing the "Many" suffix we usually use. At the same time,then(Function)was transforming the source value (unlike any other form ofthenwhich usually just ignore the emissions and act on the onComplete). Thus it was actually more of aflatMap.
- first replace all usage of
-
error handling operators have been made more consistent between
MonoandFlux(#533, #535, #531)Flux: onErrorResumeWith ->onErrorResume, mapError ->onErrorMap, switchOnError -> replace withonErrorResumewith a lambda that ignores the left hand side.Mono: otherwise ->onErrorResume, otherwiseReturn ->onErrorReturn, otherwiseIfEmpty ->switchIfEmpty, mapError ->onErrorMap
-
window:
- various window operators have been aligned in behavior with respect to cancellation (#384)
windowoperators that take a(Publisher, Function)as arguments (open boundary and closing selector) have been renamed towindowWhen. This improves the situation where having multiple lambda-based signature can create ambiguity (even more so in languages like Kotlin) (#542)Flux.window()has been removed in favor ofwindow(Publisher),windowUntilandwindowWhile(see #260)Flux.windowTimeOrSize: the timeout now starts right from subscription, resulting in potentially empty windows being emitted. It also restarts if the
timeout expires, so there could be several empty windows emitted eg. 3 for a timeout of 100ms if the Flux doesn't emit for 300ms.
The equivalent buffer version remains unchanged. (#484)windowUntil/windowWhilehaveFluxwindows instead ofGroupedFlux(#759)
-
Cross operator changes based on prefix / parameter pattern:
- xxxMillis: Timed operators with the
Millissuffix have been removed, use theirDurationbased counterparts xxx(boolean delayError): All operators that were taking adelayErrorboolean parameter instead of having aDelayErrorsuffixed variant now use the suffix variant pattern (see #480)xxx(Function<Throwable, ?>): Operators dealing with aPredicateorFunctionofThrowablehave been aligned to take a more generic<? super Throwable>- All
subscribemethods inFluxthat had a prefetch parameter have been removed (you can uselimitRateto achieve the same effect)
- xxxMillis: Timed operators with the
-
class changes
Signalis now an interface andMutableNextSignalhas been removed (#779)- Old introspection interfaces, which were mostly only used internally, have been removed in favor of the single
Scannableinterface (Loopback, MultiProducer, MultiReceiver, Producer, Receiver, Trackable, see #249, 20bd64d) Cancellationhas been removed and replaced withDisposable(see #322, 844769d)TimedSchedulerhas been removed, as well as theScheduler#shutdownmethod (most schedulers are now fully time capable and you can usedispose(). The fewSchedulerthat are not time-capable will throw aRejectedExecutionExceptionindicating so whenever one attempts to useschedulePeriodicallyon them. Note that most timed operators now use theparallel()Scheduler by default. (see #451, #322)- Review interrupted flag on dispose (#507)
- Removed fromExecutorService(exec, boolean) variant
- Removed factory hook for simple ExecutorService (now all Scheduled)
OpenHashSethas been removed, as it was only used for Disposables: useDisposables.composite()(40fbd60)QueueSupplierhas been renamed toQueuesand is now purely about queue-related utils and static suppliers. It doesn't implementSupplier<Queue>itself anymore. (#733)
-
HooksHooksare now cumulative. Setting a hook will add it on top of the existing one(s). You can also name
some hooks, which allows to update a sub-hook or only partially reset a hook. (#687, #784)- new hooks
onEachOperator+onLastOperator, nowFunction<Publisher, Publisher>, new operatorlift(#775) - hooks must now be called with the
Context(#830)
-
nullability
-
Processors have been reworked:
- You cannot connect explicitly anymore but should rather use the
sink()method. For details and rationale, see #525. - Processors that are costly to instantiate (
TopicProcessor,WorkQueueProcessor) now have aBuilderinstead of
factory methods. However, they keep a couple of significant factory methods (see #471, #616 and #628) Mono#subscribe()returns aDisposable. Use.toProcessor()to get the exact legacy behavior (see #566, #605 and #638)UnicastProcessorwon't ignore theoverflowStrategyif we can detect that a bounded queue is used. By default, an unbounded queue is used and the strategy is silently ignored. (#612)DirectProcessorandUnicastProcessornow callonErrorDroppedandonNextDroppedif calling after terminate.Exceptions.argumentIsNullException()has been removed (most of the time replaced byObjects.requireNonNull()). (7f6ff86)- MonoProcessor cancel/dispose now signals
CancellationException(#792)
- You cannot connect explicitly anymore but should rather use the
-
Some changes have been made to the way writing custom operators works
Operators.SubscriberAdapteris not part of the public API anymore.Operators.addAndGet(AtomicLong, ...)has been removed in favor of theAtomicLongFieldUpdateralternative, itself renamed toaddCap(#371)- Removed unused and confusing Operators.addAndGet (#371)
MonoSourceandFluxSourceare now package private, superseded byMonoOperatorandFluxOperator(to deal with
newCoreSubscriber)Operators.onNextDroppeddefault behaviour is to log the drop rather than throw a "fail with cancel" exception (#823)
✨ New features and improvements
- ✨ Associating a
Contextto a reactive sequence ✨ (FluxorMono) is now possible. This is the major new feature in this milestone. (#117, #210, #337, #447, #704, #705, #723 and various other PRs)Contextcan be created using factory methods- Added
CoreSubscriber, a Reactor-specificSubscribercommon to all Flux and Mono operators and needed to pass along theContext Flux#subscribe(Subscriber)is nowfinal, and one needs to instead implementsubscribe(CoreSubscriber)in custom operators / Flux.- The context is enriched during the subscription phase, via the
subscriberContext(Function<Context, Context>). - There is a static factory method
Mono#currentContextthat can be used to emit theContextinitialized downstream of it (or propagated inside eg. aflatMap) - onErrorDropped/onNextDropped/onOperatorError/onRejectedExecution look for local hooks in the
Context(b576f7f)
Flux.errorandMono.errorare nowScalarCallableand the error case can be fused when source is scalar (#716)- Introspection is now doable on
Scannableclasses via anAttrkey. Attributes are quasi-enumerations exposing constants rather than enums. This allows splitting them into typed keys with meaningful global defaults so thatscandoesn't need aClassto attempt a cast. Operator implementation is based on aObjectscan,scanUnsafe.(see #606, #728) - Added the expand operator (#769)
Mono#fromCallable/fromSuppliernow map null result to empty Mono (#743)- Null analysis has been enabled through the use of Nullable/NonNullApi annotations (#614, #864, #873, #875)
- Upgrade to Kotlin 1.1.4-3
- Add ability to name and tag a reactive sequence (#579)
- Add a new Console logger, now the default fallback instead of JDK logger (#680)
- Add utility method to check if a Throwable is composite (#770)
- Add Swap & Composite Disposable specializations (#731, bf8d8cb)
- onEachOperator/onLastOperator/onOperatorError hooks can be named, allowing for additive hooks AND partial hook reset (#784)
- Offer way to do requests on original thread with
subscribeOn(#777) - Add
take(Duration)andtakeUntilOther(Publisher)to Mono (#797) - Have ParallelFlux lambda subscribe return composite Disposable (#800)
- Add Disposable single(), disposed() and never() factories to a
Disposablesutility class ( #799, #804, #812) - Add
Mono.cache(Duration)(#683) - Add "not dropped" assertions to StepVerifier assertions (96ae5ca)
- Add a PublisherProbe to easily probe for subscription in tests (#833)
- Return original reference if unwrapped exception is null (#848)
- Reduce WorkQueueProcessor/TopicProcessor sink serialization when it is not necessary (#630, #727)
- Added a
distinctUntilChangedvariant with a bipredicate to evaluate if there is a change (#608) - Added a
onBackpressureBuffervariant with a TTL (#296) - Added an optional configurable default timeout on
StepVerifier#verify()(#651) - Added expect|verifyErrorSatisfies StepVerifier error expectations (#670)
- When using
Mono.fromRunnable, the resulting Mono's generic type is better inferred (#686) - Assembly tracking internals have been improved, allowing to cache assembly stacktrace notably (#712)
- the
checkpointdescription is included inFlux/MonoSource#toString(#611) checkpoint(String)is now light by default, lowering the cost of using that operator (see #587)- the Supplier used for distinct can be tuned (#577)
refCountnow has a variant that waits for a grace period before unsubscribing from upstream (#569, #624, #627)- Added a
Flux.then(Mono)operator to align with the Mono equivalent (#547) - Added
Mono.delayUntilto delay the emission of a Mono until after a companion Publisher generated from the source value completes. This acts on onComplete, unlike now removeduntilOther. (#558, #568, #674) - The error message in case some operators badly behave with backpressure ("Queue full?!") has been made more explicit (#540)
ParallelSubscribernow has asubscribe()method and allsubscribe(...)methods are final like inFlux(#564)StepVerfierimprovements:- Added overloads to
Step#expectNextfor up to 6 parameters (reactor/reactor-addons#106) - Valued
expectNextSequenceshould fail when checking empty Flux (reactor/reactor-addons#98)
- Added overloads to
🪲 Bug fixes
- Fix recursive context bug on flatMapMany (8e33991)
- Add reason to default Scheduler exception throwing (#854)
- Change Operators subscriptions from enum to final classes (#857)
- Safely suppress exceptions through Exceptions.addSuppressed (#758)
- ParallelFlux now correctly support conditional subscribers (#865, #389)
- MonoProcessor shouldn't have been a strict subscriber (#866)
- Review int.max prefetch (f3e13bf)
- Multicast cancel now disposes upstream properly (#870)
- Protect serializedSink from illegal subscriber onNext throw (#613)
- Improve progressive demand handling (#371)
- ParallelScheduler does not schedule task evenly between threads (#761)
- Pass failed signal to onOperatorError in trySubscribeScalarMap (#684)
- Remove cancel during onNext for
MonoDelayElement(#749) Attr.LARGE_BUFFEREDdefaults back to null (as in "irrelevant") (#751)- Revert publishOn async path detection over-optimization (#767)
- Exceptions.addThrowable now reuse same root composite (#771)
- Guard ExecutorScheduler against Executor's task failures (#324)
- Catch fused
flattenIterablepolling failures (#841) - Fix
windowTimeoutrejection double cancel (31ef92d) - VirtualTimeScheduler shutdown is not correctly handled with periods (#776)
- Protect immediate flatMap scalar emissions from out of order consuming (c7d3c12, 3abcb83, ce49077)
- Ensure no
Subscription#requestthrows/errors (4d61440, decfa13) - Avoid the shortcut of MonoProcessor.onNext(null) for onComplete (#701)
StepVerifiernow ignores empty multivalue expectations, which could previously lead to bad assertions (#650)- Fixed a bug were macro-fusion of
Flux#thenandMono#thenwould use arrays of incompatible types, resulting in anArrayStoreException(#661) - When a
doOnNextcallback would fail, when fused it wouldn't be caught by adoOnError(#664) filterWhenhad a bug in tracking request and produced amount, resulting in under-requesting hangs (#689, #692)MonoProcessorcould unnecessarily retain references to objects from its source. This is fixed by nulling out thesourceupon all terminations (#690)MonoProcessor#block()had a legacy inner timeout. It will now indefinitely wait for the actual completion. On the other hand, using a negative or 0 timeout in theDuration-based variant will now immediately time out. (#722)scanaccumulation in a live reduction with a seed would always lag by one. It has been modified so that the seed is sent immediately, which allows the accumulator to run in lockstep with the upstream stage. (#609)- In
Flux.create, setting anonDisposecallback on theSinkafter itsonRequestmethod was called was ignored. The callback is now explicitly invoked if set after the sink was terminated. (#643) - a memory leak has been fixed in
SingleScheduler.schedule(#578) Mono.untilOtherwould not return a new instance when chained with itself (#515)Operators#setOncewas cancelling the wrong subscription
📖 Documentation, Tests and Build
- The javadoc has been reviewed and polished across the board, in style and content. Most notably, a full review of
FluxandMonojavadoc has been made. It also now outputs UTF-8 html, and a few marble diagrams were added
(#545, #559, #528, #513, #544, #644, #469, #521, #698, 2cd83c2) - Some operators' usage restrictions have been better documented in the javadoc (#560,#596, #726)
- A JMH test harness has been added (manual run only)
- Contributing to the documentation is explained and made easier with direct edit links (#801, #808)
- The reference documentation has been completed and polished as well(#516, #535, #555, #580, #853, #838, 0e0b6c9, 89eb394, #862, #404)
- Android compatibility and best effort support is described in README statement (#796)
👍 Thanks to the following contributors that also participated to this release
(and all the intermediate milestones)
@akarnokd, @Buzzardo, @Dmitriusan, @fabriziofortino, @garyrussell, @helmbold, @IlyaZinkovich, @jakekdodd, @jimhorng, @kamilszymanski, @lebannen, @making, @markotron, @olibye, @rajinisivaram, @sdeleuze, @schauder, @TomekJurkowski, @violetagg, @zgagnon