librdkafka v1.9.0
librdkafka v1.9.0 is a feature release:
- Added KIP-768 OUATHBEARER OIDC support (by @jliunyu, #3560)
- Added KIP-140 Admin API ACL support (by @emasab, #2676)
Upgrade considerations
- Consumer:
rd_kafka_offsets_store()
(et.al) will now return an error for any
partition that is not currently assigned (throughrd_kafka_*assign()
).
This prevents a race condition where an application would store offsets
after the assigned partitions had been revoked (which resets the stored
offset), that could cause these old stored offsets to be committed later
when the same partitions were assigned to this consumer again - effectively
overwriting any committed offsets by any consumers that were assigned the
same partitions previously. This would typically result in the offsets
rewinding and messages to be reprocessed.
As an extra effort to avoid this situation the stored offset is now
also reset when partitions are assigned (throughrd_kafka_*assign()
).
Applications that explicitly call..offset*_store()
will now need
to handle the case whereRD_KAFKA_RESP_ERR__STATE
is returned
in the per-partition.err
field - meaning the partition is no longer
assigned to this consumer and the offset could not be stored for commit.
Enhancements
- Improved producer queue scheduling. Fixes the performance regression
introduced in v1.7.0 for some produce patterns. (#3538, #2912) - Windows: Added native Win32 IO/Queue scheduling. This removes the
internal TCP loopback connections that were previously used for timely
queue wakeups. - Added
socket.connection.setup.timeout.ms
(default 30s).
The maximum time allowed for broker connection setups (TCP connection as
well as SSL and SASL handshakes) is now limited to this value.
This fixes the issue with stalled broker connections in the case of network
or load balancer problems.
The Java clients has an exponential backoff to this timeout which is
limited bysocket.connection.setup.timeout.max.ms
- this was not
implemented in librdkafka due to differences in connection handling and
ERR__ALL_BROKERS_DOWN
error reporting. Having a lower initial connection
setup timeout and then increase the timeout for the next attempt would
yield possibly false-positiveERR__ALL_BROKERS_DOWN
too early. - SASL OAUTHBEARER refresh callbacks can now be scheduled for execution
on librdkafka's background thread. This solves the problem where an
application has a custom SASL OAUTHBEARER refresh callback and thus needs to
callrd_kafka_poll()
(et.al.) at least once to trigger the
refresh callback before being able to connect to brokers.
With the newrd_kafka_conf_enable_sasl_queue()
configuration API and
rd_kafka_sasl_background_callbacks_enable()
the refresh callbacks
can now be triggered automatically on the librdkafka background thread. rd_kafka_queue_get_background()
now creates the background thread
if not already created.- Added
rd_kafka_consumer_close_queue()
andrd_kafka_consumer_closed()
.
This allow applications and language bindings to implement asynchronous
consumer close. - Bundled zlib upgraded to version 1.2.12.
- Bundled OpenSSL upgraded to 1.1.1n.
- Added
test.mock.broker.rtt
to simulate RTT/latency for mock brokers.
Fixes
General fixes
- Fix various 1 second delays due to internal broker threads blocking on IO
even though there are events to handle.
These delays could be seen randomly in any of the non produce/consume
request APIs, such ascommit_transaction()
,list_groups()
, etc. - Windows: some applications would crash with an error message like
no OPENSSL_Applink()
written to the console ifssl.keystore.location
was configured.
This regression was introduced in v1.8.0 due to use of vcpkgs and how
keystore file was read. #3554. - Windows 32-bit only: 64-bit atomic reads were in fact not atomic and could
in rare circumstances yield incorrect values.
One manifestation of this issue was themax.poll.interval.ms
consumer
timer expiring even though the application was polling according to profile.
Fixed by @WhiteWind (#3815). rd_kafka_clusterid()
would previously fail with timeout if
called on cluster with no visible topics (#3620).
The clusterid is now returned as soon as metadata has been retrieved.- Fix hang in
rd_kafka_list_groups()
if there are no available brokers
to connect to (#3705). - Millisecond timeouts (
timeout_ms
) in various APIs, such asrd_kafka_poll()
,
was limited to roughly 36 hours before wrapping. (#3034) - If a metadata request triggered by
rd_kafka_metadata()
or consumer group rebalancing
encountered a non-retriable error it would not be propagated to the caller and thus
cause a stall or timeout, this has now been fixed. (@aiquestion, #3625) - AdminAPI
DeleteGroups()
andDeleteConsumerGroupOffsets()
:
if the given coordinator connection was not up by the time these calls were
initiated and the first connection attempt failed then no further connection
attempts were performed, ulimately leading to the calls timing out.
This is now fixed by keep retrying to connect to the group coordinator
until the connection is successful or the call times out.
Additionally, the coordinator will be now re-queried once per second until
the coordinator comes up or the call times out, to detect change in
coordinators. - Mock cluster
rd_kafka_mock_broker_set_down()
would previously
accept and then disconnect new connections, it now refuses new connections.
Consumer fixes
rd_kafka_offsets_store()
(et.al) will now return an error for any
partition that is not currently assigned (throughrd_kafka_*assign()
).
See Upgrade considerations above for more information.rd_kafka_*assign()
will now reset/clear the stored offset.
See Upgrade considerations above for more information.seek()
followed bypause()
would overwrite the seeked offset when
later callingresume()
. This is now fixed. (#3471).
Note: Avoid storing offsets (offsets_store()
) after calling
seek()
as this may later interfere with resuming a paused partition,
instead store offsets prior to calling seek.- A
ERR_MSG_SIZE_TOO_LARGE
consumer error would previously be raised
if the consumer received a maximum sized FetchResponse only containing
(transaction) aborted messages with no control messages. The fetching did
not stop, but some applications would terminate upon receiving this error.
No error is now raised in this case. (#2993)
Thanks to @jacobmikesell for providing an application to reproduce the
issue. - The consumer no longer backs off the next fetch request (default 500ms) when
the parsed fetch response is truncated (which is a valid case).
This should speed up the message fetch rate in case of maximum sized
fetch responses. - Fix consumer crash (
assert: rkbuf->rkbuf_rkb
) when parsing
malformed JoinGroupResponse consumer group metadata state. - Fix crash (
cant handle op type
) when usingconsume_batch_queue()
(et.al)
and an OAUTHBEARER refresh callback was set.
The callback is now triggered by the consume call. (#3263) - Fix
partition.assignment.strategy
ordering when multiple strategies are configured.
If there is more than one eligible strategy, preference is determined by the
configured order of strategies. The partitions are assigned to group members according
to the strategy order preference now. (#3818) - Any form of unassign*() (absolute or incremental) is now allowed during
consumer close rebalancing and they're all treated as absolute unassigns.
(@kevinconaway)
Transactional producer fixes
- Fix message loss in idempotent/transactional producer.
A corner case has been identified that may cause idempotent/transactional
messages to be lost despite being reported as successfully delivered:
During cluster instability a restarting broker may report existing topics
as non-existent for some time before it is able to acquire up to date
cluster and topic metadata.
If an idempotent/transactional producer updates its topic metadata cache
from such a broker the producer will consider the topic to be removed from
the cluster and thus remove its local partition objects for the given topic.
This also removes the internal message sequence number counter for the given
partitions.
If the producer later receives proper topic metadata for the cluster the
previously "removed" topics will be rediscovered and new partition objects
will be created in the producer. These new partition objects, with no
knowledge of previous incarnations, would start counting partition messages
at zero again.
If new messages were produced for these partitions by the same producer
instance, the same message sequence numbers would be sent to the broker.
If the broker still maintains state for the producer's PID and Epoch it could
deem that these messages with reused sequence numbers had already been
written to the log and treat them as legit duplicates.
This would seem to the producer that these new messages were successfully
written to the partition log by the broker when they were in fact discarded
as duplicates, leading to silent message loss.
The fix included in this release is to save the per-partition idempotency
state when a partition is removed, and then recover and use that saved
state if the partition comes back at a later time. - The transactional producer would retry (re)initializing its PID if a
PRODUCER_FENCED
error was returned from the
broker (added in Apache Kafka 2.8), which could cause the producer to
seemingly hang.
This error code is now correctly handled by raising a fatal error. - If the given group coordinator connection was not up by the time
send_offsets_to_transactions()
was called, and the first connection
attempt failed then no further connection attempts were performed, ulimately
leading tosend_offsets_to_transactions()
timing out, and possibly
also the transaction timing out on the transaction coordinator.
This is now fixed by keep retrying to connect to the group coordinator
until the connection is successful or the call times out.
Additionally, the coordinator will be now re-queried once per second until
the coordinator comes up or the call times out, to detect change in
coordinators.
Producer fixes
- Improved producer queue wakeup scheduling. This should significantly
decrease the number of wakeups and thus syscalls for high message rate
producers. (#3538, #2912) - The logic for enforcing that
message.timeout.ms
is greather than
an explicitly configuredlinger.ms
was incorrect and instead of
erroring out early the lingering time was automatically adjusted to the
message timeout, ignoring the configuredlinger.ms
.
This has now been fixed so that an error is returned when instantiating the
producer. Thanks to @larry-cdn77 for analysis and test-cases. (#3709)
Checksums
Release asset checksums:
- v1.9.0.zip SHA256
a2d124cfb2937ec5efc8f85123dbcfeba177fb778762da506bfc5a9665ed9e57
- v1.9.0.tar.gz SHA256
59b6088b69ca6cf278c3f9de5cd6b7f3fd604212cd1c59870bc531c54147e889