Перейти к содержимому

Фотография

Ruby + Kafka


  • Авторизуйтесь для ответа в теме
Сообщений в теме: 7

#1 Denis38

Denis38

    Новый участник

  • Members
  • Pip
  • 16 сообщений

Отправлено 24 декабря 2018 - 13:20

Всем привет. Использую библиотеку ruby-kafka. Записываю сообщение в топик, но не могу прочитать из темы. Создаю consumer (указываю group_id) затем подписываюсь на топик и дальше пытаюсь вывести сообщения из топика, выдает такую ошибку: "Kafka::InconsistentGroupProtocol". Может кто-то уже сталкивался с такой проблемой и может подсказать как ее решить? Всем спасибо за помощь.


  • 0

#2 Little_CJIOH

Little_CJIOH

    Профессионал

  • Members
  • PipPipPipPipPipPip
  • 1 515 сообщений
  • ФИО:Власкин Павел
  • Город:Санкт-Петербург


Отправлено 24 декабря 2018 - 14:10

1. Убедится что версия кафки совместима с версией библиотеки.

2. Проверить конфигурацию топика, если у вас replication factor топика превышает количество нод, то топик будет вести себя очень странно.

3. Освоить консольные консьюмер и продюсер для экспериментов, проверить что с ними все работает.


  • 0

#3 Denis38

Denis38

    Новый участник

  • Members
  • Pip
  • 16 сообщений

Отправлено 24 декабря 2018 - 14:54

1. Убедится что версия кафки совместима с версией библиотеки.

2. Проверить конфигурацию топика, если у вас replication factor топика превышает количество нод, то топик будет вести себя очень странно.

3. Освоить консольные консьюмер и продюсер для экспериментов, проверить что с ними все работает.

Версии совместимы. Через консоль все работает нормально.
Могу прочитать топик только этой командой, только чтение продолжается бесконечно, т.к. не знаю какой командой выйти.
Нужно прочитать последние сообщение из кафки и вытащить от туда значение, может кто подскажет другие способы? 

 

kafka.each_message(topic: "greetings") do |message|
puts message.offset, message.key, message.value
end


  • 0

#4 Little_CJIOH

Little_CJIOH

    Профессионал

  • Members
  • PipPipPipPipPipPip
  • 1 515 сообщений
  • ФИО:Власкин Павел
  • Город:Санкт-Петербург


Отправлено 24 декабря 2018 - 16:07

1) попробуйте без явного указания группы

2) если это невозможно, копайте саму ошибку "The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list."

Поскольку это первый консьюмер в группе - значит вторая половина описания, попробуйте явно задать протокол группы. Хотя я с ходу не нашел как.
Ну и покажите уже код, который дает эту ошибку.


  • 0

#5 Denis38

Denis38

    Новый участник

  • Members
  • Pip
  • 16 сообщений

Отправлено 25 декабря 2018 - 07:56

1) попробуйте без явного указания группы

2) если это невозможно, копайте саму ошибку "The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list."

Поскольку это первый консьюмер в группе - значит вторая половина описания, попробуйте явно задать протокол группы. Хотя я с ходу не нашел как.
Ну и покажите уже код, который дает эту ошибку.

Попробовал изменить группу и все заработало.

Использую consumer.each_message |message| для считывания сообщений. Нужно вытащить определенное значение из сообщений и когда это значение будет равняться нужному остановить косьюмера. Не подскажите как это реализовать?


  • 0

#6 Little_CJIOH

Little_CJIOH

    Профессионал

  • Members
  • PipPipPipPipPipPip
  • 1 515 сообщений
  • ФИО:Власкин Павел
  • Город:Санкт-Петербург


Отправлено 25 декабря 2018 - 08:54

break ?


  • 0

#7 Denis38

Denis38

    Новый участник

  • Members
  • Pip
  • 16 сообщений

Отправлено 25 декабря 2018 - 15:12

break ?

Сделал consumer.stop если найдет в сообщении нужное значение. В кафку сообщение записывает сервис и если сервис не выполнил все шаги, то нужное значение (статус) так и не появится и тогда consumer не останавливается. Находится в ожидании сообщений, т. е. тест будет длиться бесконечно. Как обойти эту ситуацию? Или может я что-то не так понимаю


  • 0

#8 Spock

Spock

    Профессионал

  • Members
  • PipPipPipPipPipPip
  • 1 772 сообщений
  • ФИО:Роман

Отправлено 25 декабря 2018 - 17:03

 

 

Или может я что-то не так понимаю

возможно вы что-то не так понимаете

 

не надо тестировать функцию остановки консьюмера используя сервис который записывает статус в очередь, потом чтение из очереди - это слишком долго и хрупко и неэффективно. выглядит так что вы пытаетесь сделать тест на более высоком уровне чем это требуется

 

просто зафейкуйте очередь и выставьте нужный статус


  • 0


Количество пользователей, читающих эту тему: 0

0 пользователей, 0 гостей, 0 анонимных