What's Changed
🚀 MQTT Support
FastStream now includes a full-featured MQTT broker, installable via pip install faststream[mqtt]. It supports wildcard topic filters, path parameter capture via Path(), QoS levels, per-subscriber ack_policy, and AsyncAPI schema generation.
from faststream import FastStream, Path
from faststream.mqtt import MQTTBroker, MQTTMessage, QoS
broker = MQTTBroker("localhost:1883")
app = FastStream(broker)
@broker.subscriber(
"sensors/{device_id}/temperature",
qos=QoS.AT_LEAST_ONCE,
)
async def on_temperature(body: str, device_id: Annotated[str, Path()]) -> None:
print(device_id, body)
@app.after_startup
async def publish_demo() -> None:
await broker.publish(21.5, "sensors/room1/temperature", qos=QoS.AT_LEAST_ONCE)🔀 Multi-broker Support
A single FastStream application can now run multiple brokers at the same time. Pass all the brokers directly to the FastStream constructor — each keeps its own subscribers and publishers, and the app starts and stops all of them together. A common use case is bridging two systems: consume from one broker and re-publish to another.
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.nats import NatsBroker
kafka_broker = KafkaBroker("localhost:9092")
nats_broker = NatsBroker("nats://localhost:4222")
app = FastStream(kafka_broker, nats_broker)
@kafka_broker.subscriber("incoming")
@nats_broker.publisher("outgoing")
async def from_kafka(msg: str) -> str:
# Bridge the message from Kafka to NATS
return msg
@nats_broker.subscriber("outgoing")
async def from_nats(msg: str) -> None:
print(f"Received from NATS: {msg}")🗄️ Redis Cluster Support
FastStream's Redis broker now has a dedicated RedisClusterBroker that connects to a Redis Cluster with automatic node discovery. It is a drop-in replacement for RedisBroker — just change the class name and point it at any cluster node.
from faststream import FastStream
from faststream.redis import RedisClusterBroker
# A single URL is enough — the cluster auto-discovers all remaining nodes
broker = RedisClusterBroker("redis://node1:7000")
app = FastStream(broker)
@broker.subscriber("events")
async def handle_event(msg: str) -> None:
print(f"Received: {msg}")
@app.after_startup
async def publish_event() -> None:
await broker.publish("hello from cluster", "events")⚠️ Breaking Changes
AsyncAPIRoute parameter renames (PR #2894)
The AsyncAPIRoute class (used in ASGI hosting) has had two parameters renamed:
| Before | After | Notes |
|---|---|---|
try_it_out=False
| try_it_out_path=None
| Disabling try-it-out now uses None instead of False
|
try_it_out_url="..."
| try_it_out_path="..."
| Parameter renamed for clarity |
# Before
AsyncAPIRoute("/docs/asyncapi", try_it_out=False)
AsyncAPIRoute("/docs/asyncapi", try_it_out_url="https://api.example.com/asyncapi/try")
# After
AsyncAPIRoute("/docs/asyncapi", try_it_out_path=None)
AsyncAPIRoute("/docs/asyncapi", try_it_out_path="https://api.example.com/asyncapi/try")Additionally, a new asyncapi_json_path parameter was added (defaults to <path>.json) and its position in the signature changed — use keyword arguments to avoid surprises.
RabbitMQ: durable=True is now the default (PR #2892)
RabbitQueue and RabbitExchange now default to durable=True (previously False). This aligns with RabbitMQ 4.3+ which disables transient non-exclusive queues by default.
Impact: if you already have a transient (non-durable) queue or exchange of the same name declared on your broker, re-declaration will raise a PRECONDITION_FAILED mismatch error. To opt out, pass durable=False explicitly:
from faststream.rabbit import RabbitQueue
# To keep the old transient behavior:
queue = RabbitQueue("my-queue", durable=False)Deprecated items removed
The following APIs that were deprecated in earlier 0.x releases have been fully removed in 0.7.0:
- Publisher/subscriber-level middlewares — use broker-level or app-level middlewares instead.
ack_first,no_ackand related subscriber options — replaced byack_policy=AckPolicy.*RedisJSONMessageParser— removed. All Redis services must now use the binary message format.broker.close()— removed. Usebroker.stop()instead.
Features
- feat: FastStream[mqtt] by @borisalekseev in #2819
- feat: support broker-level ack_policy with per-subscriber override by @ce1ebrimbor in #2827
- feat: codec wiring unification by @ce1ebrimbor in #2841
- feat: add mqtt path support by @borisalekseev in #2873
- feat: expose client_rack option on the Kafka broker by @00yhj22-debug in #2871
- feat: allow aiokafka 0.14 by @00yhj22-debug in #2884
- feat: add fastapi mqtt router by @borisalekseev in #2887
- feat: add consumer_only flag to KafkaBroker by @00yhj22-debug in #2883
- feat: add Redis Cluster broker support by @powersemmi in #2854
- feat: wire codec.encode into all producers, add BatchCodecProto for batch-aware encoding by @ce1ebrimbor in #2850
- feat: add multibrokers support by @Lancetnik in #2867
- feat: add json endpoint and fix content-type header by @Cool-Cat09 in #2894
Bug Fixes
- fix: include pattern subscribers in AsyncAPI specification by @aazmv in #2813
- fix: cli preserve import errors by @vovkka in #2817
- fix: security parsing for mqtt broker by @lemmehoop in #2832
- fix: propagate outer context to nested StreamRouter on include by @lesnik512 in #2828
- fix: parsing pydantic models by @ApusBerliozi in #2847
- fix: try-it-out request timeout and NATS fake subscriber stream by @Lancetnik in #2853
- fix: handle NOGROUP error on Redis stream subscriber by @powersemmi in #2855
- fix: logger not passed to Confluent Producer and AdminClient by @mara-werils in #2859
- fix: encode unsafe AsyncAPI reference path parts, including {} and / by @borisalekseev in #2872
- fix: register POST {schema_url}/try for AsyncAPI try-it-out by @sfrangulov in #2876
- fix: consistent hashing and equality for RabbitMQ schemas by @RinZ27 in #2796
- fix: default RabbitQueue and RabbitExchange to durable=True by @Lancetnik in #2892
Documentation
- docs: images generation in release notes by @Lancetnik in #2792
- docs: add multiple topics registration with a single call by @benaduo in #2814
- docs: add How-To section placeholders for RabbitMQ, Confluent, and Redis by @benaduo in #2815
- docs: change polling_interval units (seconds -> milliseconds) by @MikhailWar in #2821
- docs: document per-message attributes via KafkaPublishMessage in publish_batch by @Bazarovinc in #2851
- docs: cover mqtt examples by tests by @borisalekseev in #2888
- docs: add multiple brokers support page by @Lancetnik in #2896
Chore / CI
- chore: test basic 3.14 by @vvlrff in #2795
- chore: prepare 0.7.0 update by @borisalekseev in #2822
- chore: add MQTT code ownership for borisalekseev by @Lancetnik in #2825
- chore: add MQTT AsyncAPI tests by @Lancetnik in #2830
- chore: parser codec protocols by @ce1ebrimbor in #2839
- chore: merge schema by @aligeromachine in #2849
New Contributors
- @aazmv made their first contribution in #2813
- @vovkka made their first contribution in #2817
- @benaduo made their first contribution in #2814
- @MikhailWar made their first contribution in #2821
- @ce1ebrimbor made their first contribution in #2827
- @lemmehoop made their first contribution in #2832
- @lesnik512 made their first contribution in #2828
- @ApusBerliozi made their first contribution in #2847
- @Bazarovinc made their first contribution in #2851
- @mara-werils made their first contribution in #2859
- @00yhj22-debug made their first contribution in #2871
- @sfrangulov made their first contribution in #2876
- @RinZ27 made their first contribution in #2796
- @Cool-Cat09 made their first contribution in #2894
Full Changelog: 0.6.7...0.7.0