Description
FastStream 0.6 is a significant technical release that aimed to address many of the current project design issues and unlock further improvements on the path to version 1.0.0. We tried our best to minimize breaking changes, but unfortunately, some aspects were simply not working well. Therefore, we decided to break them in order to move forward.
This release includes:
- Finalized Middleware API
- Finalized Router API
- Introduced dynamic subscribers
- Added support for various serializer backends (such as Msgspec)
- Support for AsyncAPI 3.0 specification
- A range of minor refactors and improvements
The primary goal of this release is to unlock the path towards further features. Therefore, we are pleased to announce that after this release, we plan to work on MQTT #956 and SQS #794 support and move towards version 1.0.0!
Breaking changes
Firstly, we have dropped support for Python 3.8 and Python 3.9. Python 3.9 is almost at the end of its life cycle, so it's a good time to update our minimum version.
FastStream object changes
The broker has become a POSITIONAL-ONLY argument. This means that FastStream(broker=broker)
is no longer valid. You should always pass the broker as a separate positional argument, like FastStream(brokers)
, to ensure proper usage.
This is a preparatory step for FastStream(*brokers)
support, which will be introduced in 1.0.0.
AsyncAPI changes
In 0.6, you can't directly pass custom AsyncAPI options to the FastStream
constructor anymore.
app = FastStream( # doesn't work anymore
...,
title="My App",
version="1.0.0",
description="Some desctiption",
)
You need to create a specification
object and pass it manually to the constructor.
from faststream import FastStream, AsyncAPI
FastStream(
...
specification=AsyncAPI(
title="My App",
version="1.0.0",
description="Some desctiption",
)
)
Retry feature removed
Previously, you were able to configure retry attempts for a handler by using the following option:
@broker.subscriber("in", retry=True) # was removed
async def handler(): ...
Unfortunately, this option was a design mistake. We apologize for any confusion it may have caused. Technically, it was just a shortcut to message.nack()
on error. We have decided that manual acknowledgement control would be more idiomatic and better for the framework. Therefore, we have provided a new feature in its place: ack_policy
control.
@broker.subscriber("test", ack_policy=AckPolicy.ACK_FIRST)
async def handler() -> None: ...
With ack_policy
, you can now control the default acknowledge behavior for your handlers. AckPolicy
offers the following options:
- REJECT_ON_ERROR (default) – to permanently discard messages on failure.
- NACK_ON_ERROR – to redeliver messages in case of failure.
- ACK_FIRST – for scenarios with high throughput where some message loss can be acceptable.
- ACK – if you want the message to be acknowledged, regardless of success or failure.
- MANUAL – fully manually control message acknowledgment (for example, calling #!python message.ack() yourself).
In addition, we have deprecated a few more options prior to ack_policy
.
ack_first=True
->AckPolicy.ACK_FIRST
no_ack=True
->AckPolicy.MANUAL
Context changes
We have made some changes to our Dependency Injection system, so the global context is no longer available.
Currently, you cannot simply import the context from anywhere and use it freely.
from faststeam import context # was removed
Instead, you should create the context in a slightly different way. The FastStream
object serves as an entry point for this, so you can place it wherever you need it:
from typing import Annotated
from faststream import Context, ContextRepo, FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker()
app = FastStream(
broker,
context=ContextRepo({
"global_dependency": "value",
}),
)
Everything else about using the context remains the same. You can request it from the context at any place that supports it.
Additionally, Context("broker")
and Context("logger")
have been moved to the local context. They cannot be accessed from lifespan hooks any longer.
@app.after_startup
async def start(
broker: Broker # does not work anymore
): ...
@router.subscriber
async def handler(
broker: Broker # still working
): ...
This change was also made to support multiple brokers.
Middlewares changes
Also, we have finalized our Middleware API. It now supports all the features we wanted, and we have no plans to change it anymore. First of all, the BaseMiddleware
class constructor requires a context (which is no longer global).
class BaseMiddleware:
def __init__(self, msg: Any | None, context: ContextRepo) -> None:
self.msg = msg
self.context = context
The context is now available as self.context
in all middleware methods.
We also changed the publish_scope
function signature.
class BaseMiddleware: # old signature
async def publish_scope(
self,
call_next: "AsyncFunc",
msg: Any,
*args: Any,
**kwargs: Any,
) -> Any: ...
Previously, any options passed to brocker.publish("msg", "destination")
had to be consumed as *args, **kwargs
.
Now, you can consume them all as a single PublishCommand
object.
from faststream import PublishCommand
class BaseMiddleware:
async def publish_scope(
self,
call_next: Callable[[PublishCommand], Awaitable[Any]],
cmd: PublishCommand,
) -> Any: ...
Thanks to Python 3.13's TypeVars
with defaults, BaseMiddleware
becomes a generic class and you can specify the PublishCommand
for the broker you want to work with.
from faststream.rabbit import RabbitPublishCommand
class Middleware(BaseMiddleware[RabbitPublishCommand]):
async def publish_scope(
self,
call_next: Callable[[RabbitPublishCommand], Awaitable[Any]],
cmd: RabbitPublishCommand,
) -> Any: ...
Warning: The methods on_consume
, after_consume
, on_publish
and after_publish
will be deprecated and removed in version 0.7. Please use consume_scope
and publish_scope
instead.
Redis Default Message format changes
In FastStream 0.6 we are using BinaryMessageFormatV1
as a default instead of JSONMessageFormat
.
You can find more details in the documentation: https://faststream.ag2.ai/latest/redis/message_format/
New Features:
-
AsyncAPI3.0 support – now you can choose between
AsyncAPI(schema_version="3.0.0")
(default) andAsyncAPI(schema_version="2.6.0")
schemas generation -
Msgspec native support
from fast_depends.msgspec import MsgSpecSerializer broker = Broker(serializer=MsgSpecSerializer())
-
Subscriber iteration support. This features supports all middlewares and other FastStream features.
subscriber = broker.subscriber(...) await subscriber.start() async for msg in subscriber: ...
Deprecation removed
@broker.subscriber(..., filters=...)
removedmessage.decoded_body
removed, useawait message.decode()
insteadpublish(..., rpc=True)
removed, usebroker.request()
instead- RabbitMQ
@broker.subscriber(..., reply_config=...)
removed, useResponse
instead
Related Issues
- fixes #1742
- close #1228
- close #980
- fixes #1742
- feature #1895
- fixes #1954
- close #1646
- fixes #1625
- close #1904
- close #1507
- close #2056
- close #1308
- close #1901
- close #2029
- close #2094
- close #1881
- close #2216
- close #2215
- close #2031
- close #2178
- close #2239
- fixes #1036
New Contributors
- @stepanbobrik made their first contribution in #2381
Full Changelog: 0.5.48...0.6.0rc0