github NuclearAPK/Simple-Kafka_Adapter v1.7.0

3 days ago

Simple-Kafka_Adapter 1.7.0 Release Notes

🎉 Основные возможности

🔄 Транзакционная поддержка (Exactly-Once семантика)

Версия 1.7.0 включает полную поддержку транзакций Kafka с гарантией Exactly-Once доставки сообщений.

Новые методы:

ИнициализироватьТранзакционногоПродюсера / InitTransactionalProducer

Компонента.ИнициализироватьТранзакционногоПродюсера("localhost:9092", "my-transaction-id");

НачатьТранзакцию / BeginTransaction

Компонента.НачатьТранзакцию();

ЗафиксироватьТранзакцию / CommitTransaction и ОтменитьТранзакцию / AbortTransaction

Если ВсеУспешно Тогда
    Компонента.ЗафиксироватьТранзакцию();
Иначе
    Компонента.ОтменитьТранзакцию();
КонецЕсли;

ОтправитьОфсетыВТранзакцию / SendOffsetsToTransaction

Компонента.ОтправитьОфсетыВТранзакцию(ОфсетыJSON, "consumer-group-id");

Применение:

  • 💰 Финансовые операции - гарантия отсутствия дубликатов в платежах и заказах
  • 🔗 Атомарные операции - все сообщения записываются вместе или не записываются вовсе
  • ♻️ Read-Process-Write - чтение → обработка → запись без потери данных при сбоях
  • Строгая консистентность - координация между топиками

📦 Пакетная отправка сообщений

ОтправитьПакетСообщений / ProduceBatch

МассивСообщенийJSON = "[
    |{""message"": ""Сообщение 1"", ""key"": ""key1"", ""partition"": 0},
    |{""message"": ""Сообщение 2"", ""key"": ""key2"", ""partition"": 1},
    |{""message"": ""Сообщение 3""}
    |]";

КоличествоОтправленных = Компонента.ОтправитьПакетСообщений(МассивСообщенийJSON, "my-topic");

Применение:

  • Высокая производительность - эффективная отправка множества сообщений
  • 📊 Массовая загрузка - импорт больших объемов данных в Kafka
  • 🔄 Синхронизация - передача данных между системами пакетами

🎯 Consumer Assignment - Явное назначение партиций

Поддержка Consumer Assignment - возможность явного назначения партиций консьюмеру без использования автоматической подписки. Это дает полный контроль над тем, какие партиции обрабатывает консьюмер.

Новые методы:

НазначитьПартиции / Assign

JSONПартиции = "[
    |{""topic"": ""my-topic"", ""partition"": 0, ""offset"": 100},
    |{""topic"": ""my-topic"", ""partition"": 2, ""offset"": 0}
    |]";

Компонента.НазначитьПартиции(JSONПартиции);

ПолучитьНазначение / GetAssignment

НазначениеJSON = Компонента.ПолучитьНазначение();
// Возвращает JSON с текущими назначенными партициями

ОтменитьНазначение / Unassign

Компонента.ОтменитьНазначение();
// Отменяет все назначенные партиции

Практические сценарии применения:

  • 🔄 Переобработка данных - начните чтение с определенного offset для повторной обработки сообщений
  • 🎯 Обработка конкретных партиций - при миграции или восстановлении данных
  • Параллельная обработка - запустите несколько независимых консьюмеров для разных партиций
  • 🧪 Тестирование - удобно тестировать обработку конкретных партиций
  • 🔧 Без ребалансировки - избежать автоматической ребалансировки consumer groups

👥 Управление группами консьюмеров

УдалитьГруппуКонсьюмеров / DeleteConsumerGroup

Если Компонента.УдалитьГруппуКонсьюмеров("localhost:9092", "old-consumer-group") Тогда
    Сообщить("Группа консьюмеров успешно удалена");
КонецЕсли;

СброситьОфсетыГруппыКонсьюмеров / ResetConsumerGroupOffsets

// Сброс на начало
Компонента.СброситьОфсетыГруппыКонсьюмеров("localhost:9092", "my-group", "my-topic", "earliest");

// Сброс на конец
Компонента.СброситьОфсетыГруппыКонсьюмеров("localhost:9092", "my-group", "my-topic", "latest");

// Сброс на временную метку
ВременнаяМетка = "1704067200000"; // 1 января 2024, 00:00:00 UTC
Компонента.СброситьОфсетыГруппыКонсьюмеров("localhost:9092", "my-group", "my-topic", ВременнаяМетка);

Применение:

  • 🗑️ Очистка ресурсов - удаление неиспользуемых consumer groups
  • ♻️ Повторная обработка - сброс офсетов для переобработки данных
  • 🔧 Восстановление - исправление после сбоев в обработке
  • 🔄 Миграция - переход между версиями приложения

🔍 Мониторинг и Health Check

ПроверитьДоступностьБрокера / PingBroker

Если Компонента.ПроверитьДоступностьБрокера("localhost:9092", 3000) Тогда
    Сообщить("Брокер доступен");
Иначе
    Сообщить("Брокер недоступен");
КонецЕсли;

Применение:

  • ✅ Health check перед запуском приложения
  • 🔍 Мониторинг состояния брокеров
  • 🔀 Автоматический failover при сбоях
  • 🛠️ Валидация конфигурации подключения

ПолучитьКоличествоСообщенийВПартиции / GetPartitionMessageCount

Количество = Компонента.ПолучитьКоличествоСообщенийВПартиции("localhost:9092", "my-topic", 0);
Сообщить("В партиции 0 содержится " + Количество + " сообщений");

Применение:

  • 📊 Мониторинг объема данных в партициях
  • 📈 Планирование ресурсов для обработки
  • ⚖️ Балансировка нагрузки между партициями
  • ✔️ Проверка успешной записи сообщений

📚 Улучшения документации

Новая документация

  • docs/transactions.md - полная документация по транзакционной поддержке с примерами Exactly-Once семантики
  • docs/producer.md - добавлена документация по пакетной отправке сообщений
  • docs/consumer.md - расширена документация по Consumer Assignment с подробными примерами
  • docs/admin.md - добавлена документация по управлению группами консьюмеров, health check и мониторингу
  • Практические сценарии применения для всех новых методов
  • Примеры кода на языке 1С для всех возможностей

🚀 Миграция с версии 1.6.x

Версия 1.7.0 полностью обратно совместима с версией 1.6.x. Все существующие методы работают без изменений.

Новые возможности требуют:

  1. Транзакционная поддержка - инициализация через ИнициализироватьТранзакционногоПродюсера()
  2. Пакетная отправка - инициализированный продюсер через ИнициализироватьПродюсера()
  3. Consumer Assignment - инициализированный консьюмер через ИнициализироватьКонсьюмера()
  4. Управление группами - прямая работа с брокером, не требует инициализации
  5. Мониторинг (Ping, MessageCount) - прямая работа с брокером, не требует инициализации

Важно:

  • Методы Подписаться() и НазначитьПартиции() взаимоисключающие - используйте только один из них
  • При использовании НазначитьПартиции() автоматическая ребалансировка consumer group не происходит
  • Транзакционный продюсер требует уникального transactional.id для каждого экземпляра
  • Транзакции снижают пропускную способность на 20-30% из-за дополнительных служебных записей
  • Для чтения транзакционных сообщений консьюмер должен иметь isolation.level=read_committed

📦 Загрузка

Скачать релиз можно на странице Releases

🙏 Благодарности

Спасибо всем участникам и пользователям проекта за вклад и обратную связь!

🔗 Ссылки

  • Полный Changelog
  • Документация Transactions (Транзакции)
  • Документация Producer
  • Документация Consumer
  • Документация Admin API
  • GitHub Repository

Simple-Kafka_Adapter - Native API компонент для работы с Apache Kafka из 1С:Предприятие 8.3

Don't miss a new Simple-Kafka_Adapter release

NewReleases is sending notifications on new releases.