github ag2ai/faststream 0.6.0rc0
v0.6.0rc0

latest releases: 0.6.0rc2, 0.6.0rc1
one month ago

Description

FastStream 0.6 is a significant technical release that aimed to address many of the current project design issues and unlock further improvements on the path to version 1.0.0. We tried our best to minimize breaking changes, but unfortunately, some aspects were simply not working well. Therefore, we decided to break them in order to move forward.

This release includes:

  • Finalized Middleware API
  • Finalized Router API
  • Introduced dynamic subscribers
  • Added support for various serializer backends (such as Msgspec)
  • Support for AsyncAPI 3.0 specification
  • A range of minor refactors and improvements

The primary goal of this release is to unlock the path towards further features. Therefore, we are pleased to announce that after this release, we plan to work on MQTT #956 and SQS #794 support and move towards version 1.0.0!

Breaking changes

Firstly, we have dropped support for Python 3.8 and Python 3.9. Python 3.9 is almost at the end of its life cycle, so it's a good time to update our minimum version.

FastStream object changes

The broker has become a POSITIONAL-ONLY argument. This means that FastStream(broker=broker) is no longer valid. You should always pass the broker as a separate positional argument, like FastStream(brokers), to ensure proper usage.

This is a preparatory step for FastStream(*brokers) support, which will be introduced in 1.0.0.

AsyncAPI changes

In 0.6, you can't directly pass custom AsyncAPI options to the FastStream constructor anymore.

app = FastStream(   # doesn't work anymore
    ...,
    title="My App",
    version="1.0.0",
    description="Some desctiption",
)

You need to create a specification object and pass it manually to the constructor.

from faststream import FastStream, AsyncAPI

FastStream(
    ...
    specification=AsyncAPI(
        title="My App",
        version="1.0.0",
        description="Some desctiption",
    )
)

Retry feature removed

Previously, you were able to configure retry attempts for a handler by using the following option:

@broker.subscriber("in", retry=True)  # was removed
async def handler(): ...

Unfortunately, this option was a design mistake. We apologize for any confusion it may have caused. Technically, it was just a shortcut to message.nack() on error. We have decided that manual acknowledgement control would be more idiomatic and better for the framework. Therefore, we have provided a new feature in its place: ack_policy control.

@broker.subscriber("test", ack_policy=AckPolicy.ACK_FIRST)
async def handler() -> None: ...

With ack_policy, you can now control the default acknowledge behavior for your handlers. AckPolicy offers the following options:

  • REJECT_ON_ERROR (default) – to permanently discard messages on failure.
  • NACK_ON_ERROR – to redeliver messages in case of failure.
  • ACK_FIRST – for scenarios with high throughput where some message loss can be acceptable.
  • ACK – if you want the message to be acknowledged, regardless of success or failure.
  • MANUAL – fully manually control message acknowledgment (for example, calling #!python message.ack() yourself).

In addition, we have deprecated a few more options prior to ack_policy.

  • ack_first=True -> AckPolicy.ACK_FIRST
  • no_ack=True -> AckPolicy.MANUAL

Context changes

We have made some changes to our Dependency Injection system, so the global context is no longer available.

Currently, you cannot simply import the context from anywhere and use it freely.

from faststeam import context  # was removed

Instead, you should create the context in a slightly different way. The FastStream object serves as an entry point for this, so you can place it wherever you need it:

from typing import Annotated

from faststream import Context, ContextRepo, FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker()

app = FastStream(
    broker,
    context=ContextRepo({
        "global_dependency": "value",
    }),
)

Everything else about using the context remains the same. You can request it from the context at any place that supports it.

Additionally, Context("broker") and Context("logger") have been moved to the local context. They cannot be accessed from lifespan hooks any longer.

@app.after_startup
async def start(
    broker: Broker   # does not work anymore
): ...

@router.subscriber
async def handler(
    broker: Broker   # still working
): ...

This change was also made to support multiple brokers.

Middlewares changes

Also, we have finalized our Middleware API. It now supports all the features we wanted, and we have no plans to change it anymore. First of all, the BaseMiddleware class constructor requires a context (which is no longer global).

class BaseMiddleware:
    def __init__(self, msg: Any | None, context: ContextRepo) -> None:
        self.msg = msg
        self.context = context

The context is now available as self.context in all middleware methods.

We also changed the publish_scope function signature.

class BaseMiddleware:   # old signature
    async def publish_scope(
        self,
        call_next: "AsyncFunc",
        msg: Any,
        *args: Any,
        **kwargs: Any,
    ) -> Any: ...

Previously, any options passed to brocker.publish("msg", "destination") had to be consumed as *args, **kwargs.

Now, you can consume them all as a single PublishCommand object.

from faststream import PublishCommand

class BaseMiddleware:
    async def publish_scope(
        self,
        call_next: Callable[[PublishCommand], Awaitable[Any]],
        cmd: PublishCommand,
    ) -> Any: ...

Thanks to Python 3.13's TypeVars with defaults, BaseMiddleware becomes a generic class and you can specify the PublishCommand for the broker you want to work with.

from faststream.rabbit import RabbitPublishCommand

class Middleware(BaseMiddleware[RabbitPublishCommand]):
    async def publish_scope(
        self,
        call_next: Callable[[RabbitPublishCommand], Awaitable[Any]],
        cmd: RabbitPublishCommand,
    ) -> Any: ...

Warning: The methods on_consume, after_consume, on_publish and after_publish will be deprecated and removed in version 0.7. Please use consume_scope and publish_scope instead.

Redis Default Message format changes

In FastStream 0.6 we are using BinaryMessageFormatV1 as a default instead of JSONMessageFormat .
You can find more details in the documentation: https://faststream.ag2.ai/latest/redis/message_format/

New Features:

  1. AsyncAPI3.0 support – now you can choose between AsyncAPI(schema_version="3.0.0") (default) and AsyncAPI(schema_version="2.6.0") schemas generation

  2. Msgspec native support

    from fast_depends.msgspec import MsgSpecSerializer
    
    broker = Broker(serializer=MsgSpecSerializer())
  3. Subscriber iteration support. This features supports all middlewares and other FastStream features.

    subscriber = broker.subscriber(...)
    
    await subscriber.start()
    
    async for msg in subscriber:
        ...

Deprecation removed

  1. @broker.subscriber(..., filters=...) removed
  2. message.decoded_body removed, use await message.decode() instead
  3. publish(..., rpc=True) removed, use broker.request() instead
  4. RabbitMQ @broker.subscriber(..., reply_config=...) removed, use Response instead

Related Issues

  1. fixes #1742
  2. close #1228
  3. close #980
  4. fixes #1742
  5. feature #1895
  6. fixes #1954
  7. close #1646
  8. fixes #1625
  9. close #1904
  10. close #1507
  11. close #2056
  12. close #1308
  13. close #1901
  14. close #2029
  15. close #2094
  16. close #1881
  17. close #2216
  18. close #2215
  19. close #2031
  20. close #2178
  21. close #2239
  22. fixes #1036

New Contributors

Full Changelog: 0.5.48...0.6.0rc0

Don't miss a new faststream release

NewReleases is sending notifications on new releases.