- [Feature] Expose
Producer#current_variantas a public method. It returns the variant active for the current dispatch on the current fiber - the custom variant while inside a#with/#variant-wrapped call, otherwise the producer's default variant - so middleware and instrumentation listeners running synchronously within a dispatch can read the effective per-dispatch settings (topic_config,max_wait_timeout,default?). The lookup is fiber-local and dispatch-scoped: outside a variant-wrapped call (or from an asynchronous delivery callback) it returns the default variant. - [Enhancement] Stop allocating one interpolated string per message in
LoggerListenerbatch produce handlers. The quoted topic strings were only ever counted (quoting is a 1:1 mapping), never displayed, so counting the raw topic values yields the identical number with zero string allocations - relevant for largeproduce_many_*batches with the default logger listener attached. - [Enhancement] Use
Array#concatinProducer#buffer_manyinstead of appending messages one by one. - [Enhancement] Skip building the
message.acknowledgedinstrumentation payload in the delivery callback when nothing is subscribed to that event. The notifications bus already short-circuits on empty listeners, but only after the payload hash was allocated - once per delivered message on the polling thread. Mirrors the listener guard already used by the statistics callback. Late subscribers keep working as the check happens on each emission. - [Enhancement] Resolve the fiber-local variant once per
#producecall and once per#produce_many_syncwait phase instead of re-resolving it for every usage and for every waited delivery handle. For a 1,000-message sync batch this removes ~2,000 redundant fiber-local lookups. - [Enhancement] Do not allocate the fiber-local variants hash on the
Producer#current_variantread path. Previously every fiber that produced messages got a Hash pinned to it for the fiber's lifetime (per producer use), even when variants were never used - wasteful under fiber-per-request servers (Falcon, async). The hash is now only created by variant wrapper methods that actually need to write to it. - [Enhancement] Cache the variant validation contract in a constant instead of instantiating a new
Contracts::Varianton everyProducer#with/Producer#variantcall (mirrors the existingTransactions::CONTRACTpattern). - [Enhancement] Cache the tombstone validation contract in a constant instead of instantiating a new
Contracts::Tombstoneper tombstone message, removing per-message allocations in thetombstone_*APIs (mirrors the existingTransactions::CONTRACTpattern). - [Enhancement] Replace explicit
Warning[:performance]opt-in with a dynamic approach usingWarning.categories(available since Ruby 3.4) to automatically enable all stable opt-in warning categories in the test suite, including:strict_unused_blockintroduced in Ruby 4.0. - [Fix] Prevent a deadlock between a transactional single-message dispatch and
#close. A singleproduce_sync/produce_asyncon a transactional producer incremented the operations counter (which#closedrains while holding@transaction_mutex) before acquiring@transaction_mutexfor its per-message transaction - an inverted lock order. A dispatch that had counted itself but not yet taken@transaction_mutexcould deadlock a concurrent#closepermanently (the close wait loop has no timeout). Transactional dispatches now take@transaction_mutexbefore the operation is counted, matching#close's lock order (@transaction_mutex->@operating_mutex-> operations counter). - [Fix] Prevent a deadlock (
ThreadError: deadlock; recursive locking) when closing an idempotent producer (withreload_on_idempotent_fatal_errorenabled) that has buffered messages whose final flush surfaces a fatal librdkafka error.#closeperforms the final flush while already holding@operating_mutex, and the idempotent fatal-error reload tried to re-acquire that same mutex, leaving the producer stuck in:closingwith the native client leaked. The idempotent reload is now skipped on the closing path, and the final buffer flush is best-effort so client teardown always completes. - [Fix] Make concurrent idempotent fatal-error reload thread-safe. When several threads shared an idempotent producer (with
reload_on_idempotent_fatal_errorenabled), a single fatal librdkafka condition failed all their in-flight produces at once and each entered the reload path; the second reload ranreload!after the first had already reset@clienttonil, raisingNoMethodError. The idempotent reload now bails out if another thread already reloaded (mirroring the transactional path'sreturn if @status.configured?guard). Additionally,Status#active?now classifies the lifecycle from a single atomic read andProducer#ensure_active!branches on one snapshot, so a concurrentconfigured -> connectedtransition during a reload can no longer makeensure_active!raiseStatusInvalidErrorfor a valid, active producer. - [Fix] Stop
#flush_async/#flush_syncfrom silently dropping valid buffered messages when the dispatch fails.#flushremoves the batch from the internal buffer before dispatching it, and a failure (a single invalid message failing validation before anything is sent, or a mid-batch inline error such as queue full) previously discarded the entire taken batch - the removed messages were never restored. A failed flush now re-buffers the messages that never reached librdkafka (the whole batch on validation failure or on a transactional rollback, the unsent remainder otherwise) so they can be retried instead of being lost. - [Fix] Make
Producer#closefork-safe so the GC finalizer inherited by a forked child can no longer close the parent's client.#clientregisters anObjectSpacefinalizer that calls#close; that finalizer is inherited acrossfork, and a child that inherited a used producer, never touched it, and exited normally would run#closein the child - flushing and closing (with the real rdkafka client,rd_kafka_destroyon a fork-inherited handle, i.e. undefined behavior) a client owned by the parent.#closenow detects when it runs in a process other than the one that built the client, drops the inherited references and finalizer, and returns without touching the native client (matching the existing fork guard on the#clientpath). - [Fix] Guard the internal buffer appends in
Producer#bufferandProducer#buffer_manywith@buffer_mutex. The appends mutated the shared@messagesbuffer without the lock thatflush/purge/closehold while swapping it for a fresh array, so a concurrent swap landing between reading@messagesand appending could drop the message into an orphaned array that is never dispatched - silently losing buffered messages in the documented "buffer in one thread, flush in another" pattern. - [Fix] Stop a nested same-producer variant call from clobbering the outer variant inside a variant
transactionblock.transactionis the only variant-wrapped method that yields user code, so a variant call nested inside it (anothervariant.produce_*, or a raw producer dispatch in the same scope) used to delete the sharedFiber.current.waterdrop_clientsentry on return, making the rest of the block silently fall back to the default variant and dispatch with defaulttopic_config(timeouts, compression, partitioner) instead of the altered one. The wrapper now saves and restores the previous entry instead of unconditionally deleting it (still deleting when there was none, so the fiber-local hash does not accumulate stale keys). - [Fix] Stop
ConnectionPool#shutdownand#reloadfrom silently dropping in-flight messages. Both closed every pooled producer withclose!(force), which flushes for the max wait timeout and then purges whatever has not drained - so on a slow or unreachable broker, queuedproduce_asyncmessages were cancelled and lost with no delivery report. They now close producers gracefully by default (#reloadalways;#shutdownunless called with the newforce: true), letting messages flush instead of being purged. Passpool.shutdown(force: true)to keep the old force-and-purge behavior. - [Fix] Close a race in the FD poller where a producer registered while the last one was being torn down could be left permanently unpolled (sync produces hang until timeout, async deliveries are never acknowledged). The poller thread decided to exit (last producer unregistered) and cleared its thread reference in two separate, unsynchronized steps, so a
registerlanding in that gap saw the still-alive exiting thread, skipped starting a fresh one, and then had its producer's state closed by the exiting thread's cleanup. The thread now decides to stop and clears its reference in a single mutex section, so a racingregistereither keeps it running or starts a fresh thread; and the exit cleanup runs only on an abnormal exit, since a normal exit always leaves an empty registry and so can never close a producer registered in the gap.