github ag2ai/faststream 0.5.0rc0
v0.5.0rc0

latest releases: 0.6.0rc2, 0.6.0rc1, 0.6.0rc0...
pre-release17 months ago

What's Changed

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

This is still an RC (Release Candidate) for you to test before the stable release. You can manually install it in your project:

pip install faststream==0.5.0rc0

We look forward to your feedback!

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...
 
@subscriber()
async def handler(msg: dict[str, Any]):
    ...

This is the preferred syntax for filtering now (the old one will be removed in 0.6.0)

  1. The router.publisher() function now returns the correct Publisher object you can use later (after broker startup).
publisher = router.publisher("test")

@router.subscriber("in")
async def handler():
    await publisher.publish("msg")

(Until 0.5.0 you could use it in this way with broker.publisher only)

  1. A list of middlewares can be passed to a broker.publisher as well:
broker = Broker(..., middlewares=())

@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=())  # new feature
async def handler():
    ...
  1. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

  2. ⚠️ BREAKING CHANGE ⚠️ : both subscriber and publisher middlewares should be async context manager type

from contextlib import asynccontextmanager

@asynccontextmanager
async def subscriber_middleware(msg_body):
    yield msg_body

@asynccontextmanager
async def publisher_middleware(
    msg_to_publish,
    **publish_arguments,
):
    yield msg_to_publish

@broker.subscriber("in", middlewares=(subscriber_middleware,))
@broker.publisher("out", middlewares=(publisher_middleware,))
async def handler():
    ...
  1. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  2. All .pyi files are removed, and explicit docstrings and methods options are added.

  3. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

New Contributors

Full Changelog: 0.4.7...0.5.0rc0

Don't miss a new faststream release

NewReleases is sending notifications on new releases.