github ag2ai/faststream 0.7.0
v0.7.0

6 hours ago

What's Changed

🚀 MQTT Support

FastStream now includes a full-featured MQTT broker, installable via pip install faststream[mqtt]. It supports wildcard topic filters, path parameter capture via Path(), QoS levels, per-subscriber ack_policy, and AsyncAPI schema generation.

from faststream import FastStream, Path
from faststream.mqtt import MQTTBroker, MQTTMessage, QoS

broker = MQTTBroker("localhost:1883")
app = FastStream(broker)

@broker.subscriber(
    "sensors/{device_id}/temperature",
    qos=QoS.AT_LEAST_ONCE,
)
async def on_temperature(body: str, device_id: Annotated[str, Path()]) -> None:
    print(device_id, body)

@app.after_startup
async def publish_demo() -> None:
    await broker.publish(21.5, "sensors/room1/temperature", qos=QoS.AT_LEAST_ONCE)

🔀 Multi-broker Support

A single FastStream application can now run multiple brokers at the same time. Pass all the brokers directly to the FastStream constructor — each keeps its own subscribers and publishers, and the app starts and stops all of them together. A common use case is bridging two systems: consume from one broker and re-publish to another.

from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.nats import NatsBroker

kafka_broker = KafkaBroker("localhost:9092")
nats_broker = NatsBroker("nats://localhost:4222")

app = FastStream(kafka_broker, nats_broker)

@kafka_broker.subscriber("incoming")
@nats_broker.publisher("outgoing")
async def from_kafka(msg: str) -> str:
    # Bridge the message from Kafka to NATS
    return msg

@nats_broker.subscriber("outgoing")
async def from_nats(msg: str) -> None:
    print(f"Received from NATS: {msg}")

🗄️ Redis Cluster Support

FastStream's Redis broker now has a dedicated RedisClusterBroker that connects to a Redis Cluster with automatic node discovery. It is a drop-in replacement for RedisBroker — just change the class name and point it at any cluster node.

from faststream import FastStream
from faststream.redis import RedisClusterBroker

# A single URL is enough — the cluster auto-discovers all remaining nodes
broker = RedisClusterBroker("redis://node1:7000")
app = FastStream(broker)

@broker.subscriber("events")
async def handle_event(msg: str) -> None:
    print(f"Received: {msg}")

@app.after_startup
async def publish_event() -> None:
    await broker.publish("hello from cluster", "events")

⚠️ Breaking Changes

AsyncAPIRoute parameter renames (PR #2894)

The AsyncAPIRoute class (used in ASGI hosting) has had two parameters renamed:

Before After Notes
try_it_out=False try_it_out_path=None Disabling try-it-out now uses None instead of False
try_it_out_url="..." try_it_out_path="..." Parameter renamed for clarity
# Before
AsyncAPIRoute("/docs/asyncapi", try_it_out=False)
AsyncAPIRoute("/docs/asyncapi", try_it_out_url="https://api.example.com/asyncapi/try")

# After
AsyncAPIRoute("/docs/asyncapi", try_it_out_path=None)
AsyncAPIRoute("/docs/asyncapi", try_it_out_path="https://api.example.com/asyncapi/try")

Additionally, a new asyncapi_json_path parameter was added (defaults to <path>.json) and its position in the signature changed — use keyword arguments to avoid surprises.


RabbitMQ: durable=True is now the default (PR #2892)

RabbitQueue and RabbitExchange now default to durable=True (previously False). This aligns with RabbitMQ 4.3+ which disables transient non-exclusive queues by default.

Impact: if you already have a transient (non-durable) queue or exchange of the same name declared on your broker, re-declaration will raise a PRECONDITION_FAILED mismatch error. To opt out, pass durable=False explicitly:

from faststream.rabbit import RabbitQueue

# To keep the old transient behavior:
queue = RabbitQueue("my-queue", durable=False)

Deprecated items removed

The following APIs that were deprecated in earlier 0.x releases have been fully removed in 0.7.0:

  • Publisher/subscriber-level middlewares — use broker-level or app-level middlewares instead.
  • ack_first, no_ack and related subscriber options — replaced by ack_policy=AckPolicy.*
  • RedisJSONMessageParser — removed. All Redis services must now use the binary message format.
  • broker.close() — removed. Use broker.stop() instead.

Features

Bug Fixes

  • fix: include pattern subscribers in AsyncAPI specification by @aazmv in #2813
  • fix: cli preserve import errors by @vovkka in #2817
  • fix: security parsing for mqtt broker by @lemmehoop in #2832
  • fix: propagate outer context to nested StreamRouter on include by @lesnik512 in #2828
  • fix: parsing pydantic models by @ApusBerliozi in #2847
  • fix: try-it-out request timeout and NATS fake subscriber stream by @Lancetnik in #2853
  • fix: handle NOGROUP error on Redis stream subscriber by @powersemmi in #2855
  • fix: logger not passed to Confluent Producer and AdminClient by @mara-werils in #2859
  • fix: encode unsafe AsyncAPI reference path parts, including {} and / by @borisalekseev in #2872
  • fix: register POST {schema_url}/try for AsyncAPI try-it-out by @sfrangulov in #2876
  • fix: consistent hashing and equality for RabbitMQ schemas by @RinZ27 in #2796
  • fix: default RabbitQueue and RabbitExchange to durable=True by @Lancetnik in #2892

Documentation

  • docs: images generation in release notes by @Lancetnik in #2792
  • docs: add multiple topics registration with a single call by @benaduo in #2814
  • docs: add How-To section placeholders for RabbitMQ, Confluent, and Redis by @benaduo in #2815
  • docs: change polling_interval units (seconds -> milliseconds) by @MikhailWar in #2821
  • docs: document per-message attributes via KafkaPublishMessage in publish_batch by @Bazarovinc in #2851
  • docs: cover mqtt examples by tests by @borisalekseev in #2888
  • docs: add multiple brokers support page by @Lancetnik in #2896

Chore / CI

New Contributors

Full Changelog: 0.6.7...0.7.0

Don't miss a new faststream release

NewReleases is sending notifications on new releases.