Simple-Kafka_Adapter 1.7.0 Release Notes
🎉 Основные возможности
🔄 Транзакционная поддержка (Exactly-Once семантика)
Версия 1.7.0 включает полную поддержку транзакций Kafka с гарантией Exactly-Once доставки сообщений.
Новые методы:
ИнициализироватьТранзакционногоПродюсера / InitTransactionalProducer
Компонента.ИнициализироватьТранзакционногоПродюсера("localhost:9092", "my-transaction-id");
НачатьТранзакцию / BeginTransaction
Компонента.НачатьТранзакцию();
ЗафиксироватьТранзакцию / CommitTransaction и ОтменитьТранзакцию / AbortTransaction
Если ВсеУспешно Тогда
Компонента.ЗафиксироватьТранзакцию();
Иначе
Компонента.ОтменитьТранзакцию();
КонецЕсли;
ОтправитьОфсетыВТранзакцию / SendOffsetsToTransaction
Компонента.ОтправитьОфсетыВТранзакцию(ОфсетыJSON, "consumer-group-id");
Применение:
- 💰 Финансовые операции - гарантия отсутствия дубликатов в платежах и заказах
- 🔗 Атомарные операции - все сообщения записываются вместе или не записываются вовсе
- ♻️ Read-Process-Write - чтение → обработка → запись без потери данных при сбоях
- ✅ Строгая консистентность - координация между топиками
📦 Пакетная отправка сообщений
ОтправитьПакетСообщений / ProduceBatch
МассивСообщенийJSON = "[
|{""message"": ""Сообщение 1"", ""key"": ""key1"", ""partition"": 0},
|{""message"": ""Сообщение 2"", ""key"": ""key2"", ""partition"": 1},
|{""message"": ""Сообщение 3""}
|]";
КоличествоОтправленных = Компонента.ОтправитьПакетСообщений(МассивСообщенийJSON, "my-topic");
Применение:
- ⚡ Высокая производительность - эффективная отправка множества сообщений
- 📊 Массовая загрузка - импорт больших объемов данных в Kafka
- 🔄 Синхронизация - передача данных между системами пакетами
🎯 Consumer Assignment - Явное назначение партиций
Поддержка Consumer Assignment - возможность явного назначения партиций консьюмеру без использования автоматической подписки. Это дает полный контроль над тем, какие партиции обрабатывает консьюмер.
Новые методы:
НазначитьПартиции / Assign
JSONПартиции = "[
|{""topic"": ""my-topic"", ""partition"": 0, ""offset"": 100},
|{""topic"": ""my-topic"", ""partition"": 2, ""offset"": 0}
|]";
Компонента.НазначитьПартиции(JSONПартиции);
ПолучитьНазначение / GetAssignment
НазначениеJSON = Компонента.ПолучитьНазначение();
// Возвращает JSON с текущими назначенными партициями
ОтменитьНазначение / Unassign
Компонента.ОтменитьНазначение();
// Отменяет все назначенные партиции
Практические сценарии применения:
- 🔄 Переобработка данных - начните чтение с определенного offset для повторной обработки сообщений
- 🎯 Обработка конкретных партиций - при миграции или восстановлении данных
- ⚡ Параллельная обработка - запустите несколько независимых консьюмеров для разных партиций
- 🧪 Тестирование - удобно тестировать обработку конкретных партиций
- 🔧 Без ребалансировки - избежать автоматической ребалансировки consumer groups
👥 Управление группами консьюмеров
УдалитьГруппуКонсьюмеров / DeleteConsumerGroup
Если Компонента.УдалитьГруппуКонсьюмеров("localhost:9092", "old-consumer-group") Тогда
Сообщить("Группа консьюмеров успешно удалена");
КонецЕсли;
СброситьОфсетыГруппыКонсьюмеров / ResetConsumerGroupOffsets
// Сброс на начало
Компонента.СброситьОфсетыГруппыКонсьюмеров("localhost:9092", "my-group", "my-topic", "earliest");
// Сброс на конец
Компонента.СброситьОфсетыГруппыКонсьюмеров("localhost:9092", "my-group", "my-topic", "latest");
// Сброс на временную метку
ВременнаяМетка = "1704067200000"; // 1 января 2024, 00:00:00 UTC
Компонента.СброситьОфсетыГруппыКонсьюмеров("localhost:9092", "my-group", "my-topic", ВременнаяМетка);
Применение:
- 🗑️ Очистка ресурсов - удаление неиспользуемых consumer groups
- ♻️ Повторная обработка - сброс офсетов для переобработки данных
- 🔧 Восстановление - исправление после сбоев в обработке
- 🔄 Миграция - переход между версиями приложения
🔍 Мониторинг и Health Check
ПроверитьДоступностьБрокера / PingBroker
Если Компонента.ПроверитьДоступностьБрокера("localhost:9092", 3000) Тогда
Сообщить("Брокер доступен");
Иначе
Сообщить("Брокер недоступен");
КонецЕсли;
Применение:
- ✅ Health check перед запуском приложения
- 🔍 Мониторинг состояния брокеров
- 🔀 Автоматический failover при сбоях
- 🛠️ Валидация конфигурации подключения
ПолучитьКоличествоСообщенийВПартиции / GetPartitionMessageCount
Количество = Компонента.ПолучитьКоличествоСообщенийВПартиции("localhost:9092", "my-topic", 0);
Сообщить("В партиции 0 содержится " + Количество + " сообщений");
Применение:
- 📊 Мониторинг объема данных в партициях
- 📈 Планирование ресурсов для обработки
- ⚖️ Балансировка нагрузки между партициями
- ✔️ Проверка успешной записи сообщений
📚 Улучшения документации
Новая документация
- docs/transactions.md - полная документация по транзакционной поддержке с примерами Exactly-Once семантики
- docs/producer.md - добавлена документация по пакетной отправке сообщений
- docs/consumer.md - расширена документация по Consumer Assignment с подробными примерами
- docs/admin.md - добавлена документация по управлению группами консьюмеров, health check и мониторингу
- Практические сценарии применения для всех новых методов
- Примеры кода на языке 1С для всех возможностей
🚀 Миграция с версии 1.6.x
Версия 1.7.0 полностью обратно совместима с версией 1.6.x. Все существующие методы работают без изменений.
Новые возможности требуют:
- Транзакционная поддержка - инициализация через
ИнициализироватьТранзакционногоПродюсера() - Пакетная отправка - инициализированный продюсер через
ИнициализироватьПродюсера() - Consumer Assignment - инициализированный консьюмер через
ИнициализироватьКонсьюмера() - Управление группами - прямая работа с брокером, не требует инициализации
- Мониторинг (Ping, MessageCount) - прямая работа с брокером, не требует инициализации
Важно:
- Методы
Подписаться()иНазначитьПартиции()взаимоисключающие - используйте только один из них - При использовании
НазначитьПартиции()автоматическая ребалансировка consumer group не происходит - Транзакционный продюсер требует уникального
transactional.idдля каждого экземпляра - Транзакции снижают пропускную способность на 20-30% из-за дополнительных служебных записей
- Для чтения транзакционных сообщений консьюмер должен иметь
isolation.level=read_committed
📦 Загрузка
Скачать релиз можно на странице Releases
🙏 Благодарности
Спасибо всем участникам и пользователям проекта за вклад и обратную связь!
🔗 Ссылки
- Полный Changelog
- Документация Transactions (Транзакции)
- Документация Producer
- Документация Consumer
- Документация Admin API
- GitHub Repository
Simple-Kafka_Adapter - Native API компонент для работы с Apache Kafka из 1С:Предприятие 8.3