Admin Client support
v0.11.5 is a feature release that adds support for the Kafka Admin API (KIP-4).
Admin API
This release adds support for the Admin API, enabling applications and users to perform administrative Kafka tasks programmatically:
- Create topics - specifying partition count, replication factor and topic configuration.
- Delete topics - delete topics in cluster.
- Create partitions - extend a topic with additional partitions.
- Alter configuration - set, modify or delete configuration for any Kafka resource (topic, broker, ..).
- Describe configuration - view configuration for any Kafka resource.
The API closely follows the Java Admin API:
def example_create_topics(a, topics):
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in topics]
# Call create_topics to asynchronously create topics
fs = a.create_topics(new_topics)
# Wait for operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
Additional examples can be found in examples/adminapi
Enhancements
- Schema Registry HTTPS support with TLS client auth added (#90)
- Metadata API list_topics() added (#161, @tbsaunde, @stephan-hof)
- Expose librdkafka built-in partitioner options directly (#396)
- Callback based throttle event handling;
throttle_cb
(#237) (#377) - Added Unicode support for header values (#382)
- OpenSSL version bump to 1.0.2o (#410)
- Avro documentation added to the docs (#382)
- Python 3.7 support (#382)
- Allow passing headers as both list(tuples) and dict() (#355)
- Support for legacy setuptool's install_requires (#399)
Fixes
- Release GIL before making blocking calls (#412)
- Prevent application config dict mutation (#412)
- Intercept plugin configurations to ensure proper ordering (#404)
test_compatibility()
should returnFalse
notNone
would returnNone
when unable to check compatibility (#372, @Enether)- Schema Registry client returns false when unable to check compatibility(#372, @Enether)
- Fix invocation of SchemaParseException (#376)
- Fix call ordering to avoid callback crash on implicit close (#265)
- Fix memory leaks in generic client setters (#382)
- Fix AvroProducer/AvroConsumer key/value identity check (#342)
- Correct
Producer.produce
documentation to use correct time unit of seconds (#384) (#385) - Fix KafkaError refcounting which could lead to memory leaks (#382)