С выходом PHP 8.1 у разработчиков, использующих ZTS (Zend Thread Safety) и расширение parallel, появился ещё один удобный инструмент для межпоточного взаимодействия. Речь идёт о разделяемой in-memory базе SQLite, которая настраивается одной строкой DSN. Такой подход оказывается проще и нагляднее, чем стандартные parallel\Channel или самописные сокеты.
PHP с включённым ZTS позволяет выполнять код одновременно в нескольких потоках. Расширение parallel предоставляет для этого высокоуровневый API: parallel\Runtime, parallel\Future, parallel\Channel. Потоки изолированы, и для передачи данных приходится использовать либо каналы (parallel\Channel), либо организовывать собственный механизм обмена данными через сокеты, файлы или разделяемую память. Это работает, но требует дополнительного кода/расширения/опыта и не всегда удобно.
Начиная с PHP 8.1, PDO SQLite поддерживает специальный формат DSN с URI, который позволяет открыть одну и ту же in-memory базу данных из разных потоков. Ключевая строка:
$pdo = new \PDO('sqlite:file:my_channel?mode=memory&cache=shared');file:my_channel— имя канала (может быть любым);mode=memory— база данных существует только в оперативной памяти;cache=shared— разделяемый кэш, позволяющий другим соединениям с тем же именем канала работать с той же самой базой.
Если создать такое соединение в основном потоке, а затем в каждом дочернем потоке открыть PDO с точно таким же DSN, все они будут видеть одни и те же таблицы, разделяя память между собой. Это превращает SQLite в идеальную «шину данных» для межпоточного общения.
Основной поток создаёт таблицу-задач и запускает несколько воркеров:
// главный поток
$channel = new \PDO('sqlite:file:tasks?mode=memory&cache=shared');
$channel->exec('CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload TEXT,
status TEXT
)');
$callback = static function() {
$channel = new \PDO('sqlite:file:tasks?mode=memory&cache=shared');
// Изменяем и получаем задачу, такой запрос защищает запись от других потоков
$sql = 'UPDATE tasks SET status = "progress" WHERE id IN (
SELECT id FROM tasks WHERE status = "pending" LIMIT 1
) RETURNING *';
while (true) {
// Забираем задачу, обрабатываем, обновляем статус
$stmt = $channel->prepare($sql);
$stmt->execute();
$task = $stmt->fetch(PDO::FETCH_ASSOC);
if ($task) {
// Выполнение задачи
} else {
\usleep(10_000); // 10ms небольшая пауза, чтобы не нагружать CPU
}
}
};
// Запускаем потоки
$runtimes = [];
for ($i = 0; $i < 4; $i++) {
$runtime = new \parallel\Runtime();
$runtime->run($callback);
$runtimes[] = $runtime;
}
// Ждем завершения
foreach(runtimes as $runtime){
$runtime->close();
}Теперь в основном потоке можно добавлять задачи через то же PDO-соединение, а воркеры будут их забирать. Обмен можно сделать двухсторонним, тут уже как ваша фантазия позволяет это сделать.
- Используется обычный SQL, который понятен любому PHP-разработчику.
- API parallel\Channel, Events и Sync требует явного управления блокировками и очередями сообщений, что легко приводит к взаимоблокировкам (deadlock) при нарушении порядка захвата ресурсов или асинхронных операций send/recv. В отличие от декларативной работы с SQLite, где конкурентность обрабатывается на уровне базы данных.
- Можно хранить любые структуры данных (через JSON или сериализацию), делать сложные выборки, группировки, использовать индексы.
- Не нужно подключать дополнительные расширения Redis, Memcached и т.п., достаточно SQLite, который чаще всего уже есть в PHP.
- Сохраняйте ссылку на обьект PDO, если он удалится раньше чем потоки подключатся к БД, то вы потеряете данные.
- Если необходимо БД сохранить в файл, можете воспользоватся потрясающей SQL командой которая сохранит in-memory базу в файл.
VACUUM INTO "/path/to/file.sqlite"; - Все потоки должны использовать одинаковое имя канала (в примере –
tasks).
Создавать sqlite базу нужно до new Runtime