Что пишут в блогах

Подписаться

Онлайн-тренинги

Загружаются...

Конференции

Разделы портала

Про инструменты

Лучшие вакансии

.
Тестирование параллельных процессов
18.05.2017 08:11

Автор: Николай Матюшенков

Оригинальная публикация: https://habrahabr.ru/post/327292/

Вы встречались с ошибками, которые возникают время от времени в продакшне, но никак не воспроизводятся локально? Бывает, изучаешь такой баг и вдруг понимаешь, что он проявляется только при одновременном параллельном выполнении скриптов. Изучив код, понимаешь как это исправить, чтобы такого больше не повторялось. Но на такое исправление хорошо бы написать тест…


В статье я расскажу о своем подходе к тестированию таких ситуаций. А также приведу несколько наглядных (и наверное даже классических) примеров багов, которые удобно протестировать с помощью этого подхода. Все примеры багов живые — то, что встречается в работе.

Забегая вперед сразу скажу, что в конце статьи будет ссылка на github, куда я выложил готовое решение, позволяющее тестировать параллельные консольные процессы легко и просто.

Пример номер один. Параллельное добавление одного и того же

Задача. У нас есть приложение с базой данных (PostgreSQL) и нам надо наладить импорт данных из сторонней системы. Допустим, есть таблица account (id, name) и связи идентификаторов с внешней системой в таблице account_import (id, external_id). Давайте набросаем простой механизм приема сообщений.

При приеме сообщения будем сперва проверять — есть ли такие записи у нас в базе. Если есть, то будем обновлять имеющиеся. Если нет, то будем добавлять в базу.

$data = json_decode($jsonInput, true); // '{"id":1,"name":"account1"}'


try {

$connection->beginTransaction();


// Проверим, есть ли такая запись в базе

$stmt = $connection->prepare("SELECT id FROM account_import

WHERE external_id = :external_id");

$stmt->execute([

':external_id' => $data['id'],

]);

$row = $stmt->fetch();


usleep(100000); // 0.1 sec


// Если импортируемая запись в базе есть, то обновим ее

if ($row) {

$stmt = $connection->prepare("UPDATE account SET name = :name WHERE id = (

SELECT id FROM account_import WHERE external_id = :external_id

)");

$stmt->execute([

':name' => $data['name'],

':external_id' => $data['id'],

]);

$accountId = $row['id'];

}

// Иначе создадим новую запись

else {

$stmt = $connection->prepare("INSERT INTO account (name) VALUES (:name)");

$stmt->execute([

':name' => $data['name'],

]);

$accountId = $connection->lastInsertId();


$stmt = $connection->prepare("INSERT INTO account_import (id, external_id)

VALUES (:id, :external_id)");

$stmt->execute([

':id' => $accountId,

':external_id' => $data['id'],

]);

}


$connection->commit();

}

catch (\Throwable $e) {

$connection->rollBack();

throw $e;

}

С первого взгляда выглядит хорошо. Но если данные в нашу систему могут передаваться не строго последовательно, тут можем столкнуться с проблемой. Задержка 0.1 секунды в этом примере нам нужна, чтобы гарантированно воспроизвести проблему. Что будет, если выполнить импорт одних и тех же данных параллельно? Вероятно, вместо того, чтобы данные были добавлены, а потом обновлены, будет попытка повторной вставки данных и, как следствие, ошибка нарушения первичного ключа в account_import. 

Чтобы исправить ошибку, ее хорошо бы сперва воспроизвести. А лучше всего — написать тест, который воспроизводит ошибку. Я решил для этого запускать команды асинхронно с помощью bash и написал простой скрипт для этого, который можно использовать не только в связке с PHP.

Идея проста — запускаем в фоне несколько экземпляров команд, потом ждем когда все они завершатся, и проверяем коды выполнения. Если среди кодов выполнения есть отличные от нуля, значит мы нашли баг. В упрощенном виде скрипт будет выглядеть так:


# Команда, которую будем проверять

COMMAND=”echo -e '{\"id\":1,\"name\":\"account1\"}' | ./cli app:import”


# PID-ы запущенных фоновых процессов

pids=()


# Результаты выполнения фоновых процессов

results=()


# Ожидаемые результаты выполнения фоновых процессов (нули)

expects=()


# Запустим процессы в фоне и перенаправим вывод в stderr

for i in $(seq 2)

do

eval $COMMAND 1>&2 & pids+=($!) ; echo -e '>>>' Process ${pids[i-1]} started 1>&2

done


# Ожидаем завершения каждого процесса и сохраняем результаты в $results

for pid in "${pids[@]}"

do

wait $pid

results+=($?)

expects+=(0)

echo -e '<<<' Process $pid finished 1>&2

done


# Сравним полученные результаты с ожидаемыми

result=`( IFS=$', '; echo "${results[*]}" )`

expect=`( IFS=$', '; echo "${expects[*]}" )`

if [ "$result" != "$expect" ]

then

exit 1

fi


Полную версию скрипта выложил на github.

На основе этой команды мы можем дописать к PHPUnit новые assert-ы. Тут уже все проще и я не буду подробно останавливаться на этом. Скажу только, что в вышеупомянутом проекте они реализованы. Чтобы их использовать достаточно подключить трейт AsyncTrait к вашему тесту.

Напишем такой тест.


use App\Command\Initializer;

use Mnvx\PProcess\AsyncTrait;

use Mnvx\PProcess\Command\Command;

use PHPUnit\Framework\TestCase;

use Symfony\Component\Console\Tester\CommandTester;


class ImportCommandTest extends TestCase

{

use AsyncTrait;


public function testImport()

{

$cli = Initializer::create();

$command = $cli->find('app:delete');


// Удаляем запись c external_id = 1,

// чтобы проверить случай параллельного добавления одной и той же записи

$commandTester = new CommandTester($command);

$commandTester->execute([

'externalId' => 1,

]);


$asnycCommand = new Command(

'echo -e \'{"id":1,"name":"account1"}\' | ./cli app:import', // Тестируемая команда

dirname(__DIR__), // Каталог, из которого будет запускаться команда

2 // Количество запускаемых экземпляров команд

);

// Запуск проверки

$this->assertAsyncCommand($asnycCommand);

}

}


В результате запуска теста получим такой вывод.


$ ./vendor/bin/phpunit

PHPUnit 6.1.1 by Sebastian Bergmann and contributors.


F 1 / 1 (100%)


Time: 230 ms, Memory: 6.00MB


There was 1 failure:


1) ImportCommandTest::testImport

Failed asserting that command

echo -e '{"id":1,"name":"account1"}' | ./cli app:import (path: /var/www/pprocess-playground, count: 2)

executed in parallel.

Output:



>>> Process 18143 started

>>> Process 18144 started

Account 25 imported correctly



[Doctrine\DBAL\Exception\UniqueConstraintViolationException]

An exception occurred while executing 'INSERT INTO account_import (id, exte

rnal_id) VALUES (:id, :external_id)' with params ["26", 1]:

SQLSTATE[23505]: Unique violation: 7 ОШИБКА:  повторяющееся значение ключа

нарушает ограничение уникальности "account_import_pkey"

DETAIL:  Ключ "(external_id)=(1)" уже существует.


-------


app:import


<<< Process 18143 finished

<<< Process 18144 finished


.


/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19

/var/www/pprocess-playground/tests/ImportCommandTest.php:30


FAILURES!

Tests: 1, Assertions: 1, Failures: 1.


Причину мы уже обсудили. Теперь попробуем добавить принудительную блокировку параллельного выполнения фрагмента нашего скрипта (тут используется malkusch/lock).


$mutex = new FlockMutex(fopen(__FILE__, 'r'));

$mutex->synchronized(function () use ($connection, $data) {

// наш код из блока try

});


Тест пройден:


$ ./vendor/bin/phpunit

PHPUnit 6.1.1 by Sebastian Bergmann and contributors.


.                                                                   1 / 1 (100%)


Time: 361 ms, Memory: 6.00MB


OK (1 test, 1 assertion)


Этот и другие примеры я выложил на github, если вдруг кому-то понадобится.


Пример номер два. Подготовка данных в таблице

Этот пример будет немного интереснее. Допустим, у нас есть таблица пользователей users (id, name) и мы желаем хранить в таблице users_active (id) список активных в настоящий момент пользователей.

У нас будет команда, которая каждый раз будет удалять все записи из таблицы users_acitve и добавлять туда данные заново.


try {

$connection->beginTransaction();


$connection->prepare("DELETE FROM users_active")->execute();


usleep(100000); // 0.1 sec


$connection->prepare("INSERT INTO users_active (id) VALUES (3), (5), (6), (10)")->execute();


$connection->commit();

$output->writeln('<info>users_active refreshed</info>');

}

catch (\Throwable $e) {

$connection->rollBack();

throw $e;

}


Тут только с первого взгляда все хорошо. На самом же деле при параллельном запуске снова получим ошибку.

Напишем тест, чтобы ее воспроизвести.


use Mnvx\PProcess\AsyncTrait;

use Mnvx\PProcess\Command\Command;

use PHPUnit\Framework\TestCase;


class DetectActiveUsersCommandTest extends TestCase

{

use AsyncTrait;


public function testImport()

{

$asnycCommand = new Command(

'./cli app:detect-active-users', // Тестируемая команда

dirname(__DIR__), // Каталог, из которого будет запускаться команда

2 // Количество запускаемых экземпляров команд

);

// Запуск проверки

$this->assertAsyncCommand($asnycCommand);

}

}


Запускаем тест и видим текст ошибки:


$ ./vendor/bin/phpunit tests/DetectActiveUsersCommandTest.php

PHPUnit 6.1.1 by Sebastian Bergmann and contributors.


F                                                                   1 / 1 (100%)


Time: 287 ms, Memory: 4.00MB


There was 1 failure:


1) DetectActiveUsersCommandTest::testImport

Failed asserting that command

./cli app:detect-active-users (path: /var/www/pprocess-playground, count: 2)

executed in parallel.

Output:



>>> Process 24717 started

>>> Process 24718 started

users_active refreshed

<<< Process 24717 finished



[Doctrine\DBAL\Exception\UniqueConstraintViolationException]

An exception occurred while executing 'INSERT INTO users_active (id) VALUES

(3), (5), (6), (10)':

SQLSTATE[23505]: Unique violation: 7 ОШИБКА:  повторяющееся значение ключа

нарушает ограничение уникальности "users_active_pkey"

DETAIL:  Ключ "(id)=(3)" уже существует.


-------


app:detect-active-users


<<< Process 24718 finished


.


/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19

/var/www/pprocess-playground/tests/DetectActiveUsersCommandTest.php:19


FAILURES!

Tests: 1, Assertions: 1, Failures: 1.


По тексту ошибки понятно, что снова INSERT выполняется параллельно и это приводит к нежелательным последствиям. Попробуем сделать блокировку на уровне записей — добавим строчку после старта транзакции:


$connection->prepare("SELECT id FROM users_active FOR UPDATE")->execute();


Запускаем тест — ошибка ушла. Наш тест запускает два экземпляра процесса. Давайте увеличим в нашем тесте количество экземпляров до 3-х и посмотрим, что будет.


$asnycCommand = new Command(

'./cli app:detect-active-users', // Тестируемая команда

dirname(__DIR__), // Каталог, из которого будет запускаться команда

3 // Количество запускаемых экземпляров команд

);


И снова имеем ту же ошибку. В чем дело, мы же добавили блокировку?! Немного подумав, можно догадаться, что такая блокировка поможет только если в таблице users_active есть записи. В случае же, когда работают 3 процесса одновременно, получается картина такая — первый процесс получает блокировку. Второй и третий процесс ждут завершения транзакции первого процесса. Как только транзакция будет завершена, продолжат выполняться параллельно и второй и третий процесс, что приведет к нежелательным последствиям.

Чтобы починить, сделаем блокировку более общую. Например,


$connection->prepare("SELECT id FROM users WHERE id IN (3, 5, 6, 10) FOR UPDATE")->execute();


Либо вместо DELETE мы могли просто воспользоваться TRUNCATE, которая блокирует всю таблицу.


Пример номер три. Deadlock

Бывает, что сама по себе команда не приводит к проблемам, но одновременный вызов двух разных команд, работающих с одними и теми же ресурсами приводит к проблемам. Найти причины таких багов бывает нелегко. Но если причина найдена, то тест написать точно стоит, чтобы избежать возвращения проблемы в будущем при внесении изменений в код.

Напишем пару таких команд. Это классический случай, когда возникает взаимная блокировка.

Первая команда сперва обновляет запись с, потом с.


try {

$connection->beginTransaction();


$connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();


usleep(100000); // 0.1 sec


$connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();


$connection->commit();

$output->writeln('<info>Completed without deadlocks</info>');

}

catch (\Throwable $e) {

$connection->rollBack();

throw $e;

}


Вторая команда сперва обновляет запись с, потом с.


try {

$connection->beginTransaction();


$connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();


usleep(100000); // 0.1 sec


$connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();


$connection->commit();

$output->writeln('<info>Completed without deadlocks</info>');

}

catch (\Throwable $e) {

$connection->rollBack();

throw $e;

}


Тест будет выглядеть так.


use Mnvx\PProcess\AsyncTrait;

use Mnvx\PProcess\Command\CommandSet;

use PHPUnit\Framework\TestCase;


class DeadlockCommandTest extends TestCase

{

use AsyncTrait;


public function testImport()

{

$asnycCommand = new CommandSet(

[   // Тестируемые команды

'./cli app:deadlock-one',

'./cli app:deadlock-two'

],

dirname(__DIR__), // Каталог, из которого будет запускаться команда

1 // Количество запускаемых экземпляров команд

);

// Запуск проверки

$this->assertAsyncCommands($asnycCommand);

}

}


В результате запуска теста увидим причину ошибки:


$ ./vendor/bin/phpunit tests/DeadlockCommandTest.php

PHPUnit 6.1.1 by Sebastian Bergmann and contributors.


F                                                                   1 / 1 (100%)


Time: 1.19 seconds, Memory: 4.00MB


There was 1 failure:


1) DeadlockCommandTest::testImport

Failed asserting that commands

./cli app:deadlock-one, ./cli app:deadlock-two (path: /var/www/pprocess-playground, count: 1)

executed in parallel.

Output:



>>> Process 5481 started: ./cli app:deadlock-one

>>> Process 5481 started: ./cli app:deadlock-two



[Doctrine\DBAL\Exception\DriverException]

An exception occurred while executing 'UPDATE deadlock SET value = value +

1 WHERE id = 1':

SQLSTATE[40P01]: Deadlock detected: 7 ОШИБКА:  обнаружена взаимоблокировка

DETAIL:  Процесс 5498 ожидает в режиме ShareLock блокировку "транзакция 294

738"; заблокирован процессом 5499.

Процесс 5499 ожидает в режиме ShareLock блокировку "транзакция 294737"; заб

локирован процессом 5498.

HINT:  Подробности запроса смотрите в протоколе сервера.

CONTEXT:  при изменении кортежа (0,48) в отношении "deadlock"


-------


app:deadlock-two


Completed without deadlocks

<<< Process 5481 finished

<<< Process 5484 finished


.


/var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:39

/var/www/pprocess-playground/tests/DeadlockCommandTest.php:22


FAILURES!

Tests: 1, Assertions: 1, Failures: 1.


Проблема лечится добавлением блокировки по аналогии с первым примером. Либо пересмотром структуры базы или алгоритма работы с данными.

Резюмируем

При параллельном исполнении кода могут возникать неожиданные ситуации, при исправлении которых полезно написать тесты. Мы рассмотрели несколько таких ситуаций и написали тесты, воспользовавшись pprocess.

Обсудить в форуме.