What's Changed
This release adds support for the Confluent's Python Client for Apache Kafka (TM). Confluent's Python Client for Apache Kafka does not support natively async
functions and its integration with modern async-based services is a bit trickier. That was the reason why our initial supported by Kafka broker used aiokafka. However, that choice was a less fortunate one as it is as well maintained as the Confluent version. After receiving numerous requests, we finally decided to bite the bullet and create an async
wrapper around Confluent's Python Client and add full support for it in FastStream.
If you want to try it out, install it first with:
pip install "faststream[confluent]>=0.4.0"
To connect to Kafka using the FastStream KafkaBroker module, follow these steps:
-
Initialize the KafkaBroker instance: Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address.
-
Create your processing logic: Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic
-
Decorate your processing function: To connect your processing function to the desired Kafka topics you need to decorate it with
@broker.subscriber(...)
and@broker.publisher(...)
decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator.
Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:
from faststream import FastStream
from faststream.confluent import KafkaBroker
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
return f"User: {user_id} - {user} registered"
For more information, please visit the documentation at:
https://faststream.airt.ai/latest/confluent/
List of Changes
- Update Release Notes for 0.3.13 by @faststream-release-notes-updater in #1119
- docs: close #1125 by @Lancetnik in #1126
- Add support for confluent python lib by @kumaranvpl in #1042
- Update tutorial docs to include confluent code examples by @kumaranvpl in #1131
- Add installation instructions for confluent by @kumaranvpl in #1132
- Update Release Notes for 0.4.0rc0 by @faststream-release-notes-updater in #1130
- chore: remove useless branch from CI by @Lancetnik in #1135
- chore: bump mkdocs-git-revision-date-localized-plugin from 1.2.1 to 1.2.2 by @dependabot in #1140
- chore: strict fast-depends version by @Lancetnik in #1145
- chore: update copyright by @Lancetnik in #1144
- fix: correct Windows shutdown by @Lancetnik in #1148
- docs: fix typo by @saroz014 in #1154
- Middleware Document Syntax Error by @SepehrBazyar in #1156
- fix: correct FastAPI Context type hints by @Lancetnik in #1155
- Fix bug which results in lost confluent coverage report by @kumaranvpl in #1160
- Fix failing ack tests for confluent by @kumaranvpl in #1166
- Update version to 0.4.0 and update docs by @kumaranvpl in #1171
- feat #1180: add StreamRouter.on_broker_shutdown hook by @Lancetnik in #1182
- Fix bug - using old upload-artifact version by @kumaranvpl in #1183
- Release 0.4.0 by @davorrunje in #1184
New Contributors
Full Changelog: 0.3.13...0.4.0