What's Changed
This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.
This is still an RC (Release Candidate) for you to test before the stable release. You can manually install it in your project:
pip install faststream==0.5.0rc0
We look forward to your feedback!
New features:
-
await FastStream.stop()
method andStopApplication
exception to stop aFastStream
worker are added. -
broker.subscriber()
androuter.subscriber()
functions now return aSubscriber
object you can use later.
subscriber = broker.subscriber("test")
@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
...
@subscriber()
async def handler(msg: dict[str, Any]):
...
This is the preferred syntax for filtering now (the old one will be removed in 0.6.0
)
- The
router.publisher()
function now returns the correctPublisher
object you can use later (after broker startup).
publisher = router.publisher("test")
@router.subscriber("in")
async def handler():
await publisher.publish("msg")
(Until 0.5.0
you could use it in this way with broker.publisher
only)
- A list of
middlewares
can be passed to abroker.publisher
as well:
broker = Broker(..., middlewares=())
@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=()) # new feature
async def handler():
...
-
Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.
-
⚠️ BREAKING CHANGE ⚠️ : both
subscriber
andpublisher
middlewares should be async context manager type
from contextlib import asynccontextmanager
@asynccontextmanager
async def subscriber_middleware(msg_body):
yield msg_body
@asynccontextmanager
async def publisher_middleware(
msg_to_publish,
**publish_arguments,
):
yield msg_to_publish
@broker.subscriber("in", middlewares=(subscriber_middleware,))
@broker.publisher("out", middlewares=(publisher_middleware,))
async def handler():
...
-
A better FastAPI compatibility:
fastapi.BackgroundTasks
andresponse_class
subscriber option are supported. -
All
.pyi
files are removed, and explicit docstrings and methods options are added. -
New subscribers can be registered in runtime (with an already-started broker):
subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
faststream[docs]
distribution is removed.
- Update Release Notes for 0.4.7 by @faststream-release-notes-updater in #1295
- 1129 - Create a publish command for the CLI by @MRLab12 in #1151
- Chore: packages upgraded by @davorrunje in #1306
- docs: fix typos by @omahs in #1309
- chore: update dependencies by @Lancetnik in #1323
- docs: fix misc by @Lancetnik in #1324
- docs (#1327): correct RMQ exhcanges behavior by @Lancetnik in #1328
- fix: typer 0.12 exclude by @Lancetnik in #1341
- 0.5.0 by @Lancetnik in #1326
- close #1103
- close #840
- fix #690
- fix #1206
- fix #1227
- close #568
- close #1303
- close #1287
- feat #607
New Contributors
Full Changelog: 0.4.7...0.5.0rc0