Что пишут в блогах

Подписаться

Что пишут в блогах (EN)

Разделы портала

Онлайн-тренинги

.
Мастерство тестирования Kafka: лучшие практики и стратегии
23.04.2025 00:00

Автор: Хуссем Маатали (Houssem Maatali)
Оригинал статьи
Перевод: Ольга Алифанова

Тестирование с Apache Kafka – критически важная практика, позволяющая гарантировать надежность потоковой передачи данных и обработки событий в приложениях, созданных на платформе Apache Kafka. Оно включает в себя спектр тест-техник, включая юнит-тесты, интеграционные тесты, а также нагрузочные тесты – и все они нацелены на валидацию целостности данных, масштабируемости системы и устойчивости к падениям в экосистемах Kafka.

Это необходимый шаг при разработке устойчивых и надежных решений для обработки данных в реальном времени. Kafka Streams опирается на Kafka, чтобы выполнять множество операций. Для этого нам нужен кластер Kafka. У тестирования тут три основных стратегии.

1. TopologyTestDriver

Класс TopologyTestDriver предоставлен Apache Kafka в качестве компонента их тест-библиотеки для тестирования приложений Kafka Streams. Его основная задача – упростить юнит- и интеграционное тестирование, которое валидирует поведение Kafka Streams.

Ниже – детальное описание его применения и роли:

  1. Тестирование приложений Kafka Streams. Kafka Streams – это библиотека для обработки событий в реальном времени и создания приложений потоковой передачи данных. Чтобы убедиться в корректности функционирования приложений Kafka Streams, нужно тщательно их протестировать. TopologyTestDriver специально разработан для упрощения и оптимизации тестирования.
  2. Симуляция кластера Kafka. Одно из ключевых преимуществ TopologyTestDriver – это его способность симулировать кластер Kafka в памяти. Это устраняет нужду в реальном кластере Kafka в ходе тестирования, что упрощает юнит-тестирование – больше не нужно стартовать специальный кластер Kafka.
  3. Тестирование топологий Kafka Streams. TopologyTestDriver позволяет тестировать топологии Kafka Streams, обрабатывая имитируемые записи путем различных трансформационных процессов, от источников вроде топиков Kafka до стока, например, других топиков Kafka. Это делает возможными инъекцию тестовых данных и последующую верификацию результатов.
  4. Верификация результатов. Как только проведена инъекция тестовых данных в приложение Kafka Streams, становится возможным верифицировать, что результаты работы топологии соответствуют ожиданиям. Этот шаг проверяет, что операции обработки – например, фильтрация и трансформация, - работают правильно.
  5. Изоляция тестов. При использовании TopologyTestDriver тесты можно проводить изолированно. Это значит, что тесты можно запускать, не вмешиваясь в интеграционный или боевой кластер Kafka, поддерживая целостность и стабильность системы в целом.

Вот простой пример использования TopologyTestDriver в Java:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-test-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Topology topology = yourKafkaStreamsTopology();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
// Инъекция тестовых данных
TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
inputTopic.pipeInput("key", "value");
// Чтение и валидация результатов
TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer());
KeyValue<String, String> result = outputTopic.readKeyValue();
// Выполнение проверки результатов
// Закрытие тест-драйвера
testDriver.close();

Если вкратце, TopologyTestDriver – незаменимый инструмент тестирования приложений Kafka Streams, так как позволяет имитировать кластер Kafka, проводить инъекцию тестовых данных и валидировать результаты в контролируемом тест-окружении. В результате он позволяет убедиться в правильном функционировании логики обработки потоков.

2. EmbeddedKafka

EmbeddedKafka – это инструмент тестирования, позволяющий запустить встроенный брокер Apache Kafka внутри ваших юнит- или интеграционных тестов. Это полезный инструмент для тестирования связанного с Kafka кода без необходимости настраивать и управлять отдельным тест-кластером Kafka. Он часто используется в тест-фреймворках Java и Scala.

Ниже – объяснение, как EmbeddedKafka работает и чем полезен:

  1. Тестирование кода Kafka. Kafka – распределенная платформа потоковой передачи, и при разработке приложений на основе Kafka очень важно тщательно проверить, как ваш код взаимодействует с топиками и брокерами Kafka. Опция EmbeddedKafka упрощает процесс, предлагая удобный подход к тестированию на вашем специализированном тест-окружении.
  2. Упрощение тестирования. Конфигурация настоящего кластера Kafka для тестирования может стать сложной и длительной задачей. EmbeddedKafka упрощает этот процесс, позволяя запускать копию брокера Kafka внутри тест-процесса, соответственно упрощая разработку и запуск тестов, задействующих Kafka.
  3. Брокер Kafka в памяти. Встроенный брокер Kafka – это копия, которая существует только в памяти, и это дает возможность программно запускать и прерывать его внутри тест-кейсов. По сути он дает легкое Kafka-окружение, специально предназначенное для тестирования.
  4. Возможности настройки. Встроенный брокер Kafka можно настраивать для соответствия требованиям тест-сценариев. Возможна гибкая настройка таких параметров, как количества партиций брокера, номеров портов, и других относящихся к тестам свойств.
  5. Интеграция с клиентами Kafka. Встроенный брокер Kafka бесшовно интегрируется с клиентами Kafka, включая поставщиков и потребителей. Это позволяет оценить поведение кода, создавая и принимая сообщения, как и на реальном кластере Kafka.

Вот простой пример применения EmbeddedKafka в тесте Scala:

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest._
class MyKafkaTest extends FlatSpec with Matchers with EmbeddedKafka {
val config = EmbeddedKafkaConfig(
kafkaPort = 12345,
zooKeeperPort = 56789
)
"My Kafka code" should "produce and consume messages" in {
// Запуск встроенного брокера Kafka с заданными настройками
EmbeddedKafka.start()
// Выполнение связанного с Kafka тестирования, например, генерация и потребление сообщений
// Остановка встроенного брокера Kafka, когда тестирование завершено
EmbeddedKafka.stop()
}
}

В этом примере EmbeddedKafka интегрируется со Scala-фреймворком тестирования (ScalaTest), и запускает/прерывает встроенный брокер Kafka со специальными настройками тест-кейса.

Если вкратце, EmbeddedKafka – полезная утилита тестирования для запуска встроенного брокера Kafka внутри тестов, что дает контролируемое окружение для тестирования связанного с Kafka кода без нужды в полноценном кластере Kafka.

3. Testcontainers

Testcontainers – мощный инструмент для запуска служб, вроде Kafka, в контейнерах в ходе тестов. Это позволяет создавать и управлять изолированными, одноразовыми копиями Kafka в тест-кейсах. Ниже приведу пример использования Apache Kafka с Testcontainers на Java.

Для начала нужно включить Testcontainers и модуль Kafka в зависимости проекта. Вот пример для Maven:

<dependencies>
<! - Other dependencies →
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.16.0</version> <! - Replace with the latest version ->
</dependency>
</dependencies>

Теперь можно использовать Testcontainers для запуска контейнера Kafka для тестов. Ниже – пример реализации на Java с использованием JUnit:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTest {
@ClassRule
public static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")).withNetwork(Network.SHARED).withNetworkAliases("kafka");
@Test
public void testKafkaContainer() throws ExecutionException, InterruptedException {
kafkaContainer.start();
// Получение bootstrapServers для брокера Kafka
String bootstrapServers = kafkaContainer.getBootstrapServers();
// Использование bootstrapServers в настройках поставщика/потребителя Kafka
// Пример: создание AdminClient для управления топиками Kafka
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AdminClient adminClient = AdminClient.create(properties);
// Выполнение связанного с Kafka тестирования
kafkaContainer.stop();
}
}

В этом примере мы используем Testcontainers для запуска контейнера Kafka на основании образа `confluentinc/cp-kafka`. Вы можете заменить его любым необходимым образом Kafka. Мы также указываем общую сеть, чтобы другие контейнеры могли взаимодействовать с Kafka-контейнером. `NetworkAliases` “kafka” можно использовать для доступа к брокеру Kafka внутри сети.

Как только контейнер Kafka успешно инициирован, можно получить адрес начальных серверов. Этот адрес затем можно использовать для настройки поставщика или потребителя Kafka, обеспечив связанное с Kafka тестирование в кейсах. И, наконец, когда ваши тесты завершены, нужно прекратить операции контейнера Kafka.

Критически важно убедиться, что вы настраиваете свои клиенты Kafka так, что они максимизируют пользу адреса ‘bootstrapServers’ из Kafka-контейнера. Если вы следуете этому подходу, гарантирую, что ваши Kafka-клиенты будут взаимодействовать с Kafka-брокером внутри контейнера в ходе тестирования.

Ссылки

Чтобы узнать больше о тестировании Apache Kafka, посмотрите эти достойные доверия источники:

1- Confluent’s Blog on Kafka Streams Testing with TopologyTestDriver:

Погрузитесь в тестирование Kafka Streams при помощи TopologyTestDriver: https://www.confluent.io/fr-fr/blog/test-kafka-streams-with-topologytestdriver/

2- Документация Spring Kafka EmbeddedKafka:

Изучите тестирование в экосистеме Spring Kafka: https://docs.spring.io/spring-kafka/api/org/springframework/kafka/test/context/EmbeddedKafka.html

3- TestContainers: руководство для начинающих
Откройте для себя тестирование
Kafka и других контейнеров с TestContainers: https://testcontainers.com/getting-started/

Обсудить в форуме