Ruby & PHP. Скрещиваем ужа с ежом с помощью Starling и Zend_Queue.

// Май 21st, 2011 // Highload, PHP, Ruby, RVM, Веб-разработка

В этой статье я расскажу о животрепещущем для многих вопросе. Как соединить между собой приложения на разных языках. Например, Ruby и PHP. В Twitter проблему интеграции с очередью решили с помощью Starling. Вообще сейчас намечается тенденция, что для каждой задачи подбирают свой язык. Гомогенных систем становится всё меньше. В следствии этого возникает потребность в стандартах на интеграцию разношерстного ПО в единую систему.

 

Постановка задачи

Допустим у нас есть ситуация, при которой во время какого-либо HTTP запроса необходимо сделать длительную операцию. Например, при добавлении пользователем комментария к статье отправить email уведомление её автору. В классической схеме взаимодействия (рис 7-1) мы вынуждены ждать завершения этой операции, чтобы уведомить пользователя. Получается что эта операция блокирует выполнение веб-приложения, а значит происходит потеря производительности. Да и пользователю приходится ждать лишнее время. Когда количество пользователей невелико — это не проблема, но в Highload проектах этот вопрос приобретает большое значение. Мы определились с задачей — вынести все ресурсоемкие задачи в фоновые процессы. Рассмотрим случай с хостингом и добавлением домена в веб-панели. Задачей будет выполнение скрипта на Ruby, который будет производить манипуляции с DNS зонами и перезагружать DNS сервис.

Теперь определимся с схемой работы. Клиентом очереди будет ZendFramework приложение, которое будет добавлять задачи в очередь (отправлять сообщения в очередь). Сервером (воркером) будет Ruby, который будет принимать сообщения из очереди и обрабатывать их.

Почему выбрана именно такая комбинация? ZF для клиента исторически сложился, а вот сервер на Ruby выбран, т.к. очень не хочется давать PHP права root, думаю многие админы меня поймут. Выбор очереди был сделан в пользу Starling, т.к. она основана на Memcached и использует его протокол (ну почти :-) ) и реализация на руби предельно проста.

Что такое очереди сообщений

«А как вы организуете очередь?
- С помощью бабушек!»

с Хабрахабра

Тема очередей сообщений была навеяна докладом «Разделение труда: Организация многозадачной, распределенной системы в Zend Framework с помощью Job Queue»  Александра Готгельфа на последнем ZFCONF. Он работал с сервером очередей Gearman, и рассказал о его немногочисленных, но очень серьезных багах (таких, как проблема освобождения памяти). В связи с этим я начал смотреть в торону других и остановился на MemcacheQ и Starling, который очень советовали пользователи хабра. А т.к. имплементация на руби проще со вторым он и был выбран.

Что такое Starling?

Starling — это скворец, певчая птица семейства скворцовых, широко распространённая на значительной территории Евразии, а также успешно интродуцированная в Южную Африку, Северную Америку, Австралию и Новую Зеландию. На юге и западе Европы ведёт оседлый образ жизни, а в северной и восточной её части является перелётной, в зимние месяцы мигрируя на юг. Внешне (размерами, желтым клювом и темным оперением) слегка напоминает чёрных дроздов, но в отличие от них ходит по земле, а не прыгает. :-)

А ещё так называется очередь сообщений, написанная на Ruby. Её используют в Twitter! Для установки сервера очередей у вас уже должен быть установлен Ruby. Сам же Starling ставится очень просто. Следующая строка выполняется только один раз!

gem sources -a http://gems.github.com/

Ставим гем (можно делать сколько угодно раз).

sudo gem install starling

Если вы ставили руби для всех юзеров, то выполняем вместо sudo rvmsudo

Дальше надо запустить сервер и можно коннектить серверов и клиентов.

andrey@comp$ starling
Starting at 127.0.0.1:22122.
I, [2011-05-20T10:53:34.346297 #6017]  INFO -- : Starling STARTUP on 127.0.0.1:22122

Клиентами будем называть тех, кто отправляет задания в очередь, а серверами — тех кто получает и обслуживает их (to serve). Теперь можно подключать клиентов на руби и php.

Общий формат данных. Ruby маршализация в PHP

Первое с чем я столкнулся это формат хранения сообщений в очереди. На выбор было несколько вариантов.

  • [б] Маршализация (marshalling) объектов.  Так делает руби и это дефолтовый способ работы Starling,
  • [т] Сериализация (serializing). Так хранит объекты Zend Framework, а именно Zend_Queue,
  • [т] JSON. Стандарт де-факто для взаимодействия в гетерогенных средах,
  • [т] XML. Стандарт де-юро, однако имеет больший расход памяти для хранения сообщений, т.к. имеет теговую природу,

где [т] — текстовый формат, [б] -бинарный формат.

Скажу так, в начале у меня был соблазн вообще не трогать Starling, а научиться в PHP работать с маршализованными руби объектами. Но ничего похожего я найти не смог. Использовать XML не хотелось, для работы с JSON надо было бы здорово переписывать сервер очередей. Вот тогда то мне на глаза и попался гем php_serialize.

Ruby + Starling

Делает он ровно то, что я от него и ожидал, а именно выполняет serialize() функцию над Ruby объектами и наоборот. Итак схема такова: php -> serialize -> starling -> unserialize -> ruby. Для этого правда всё равно пришлось коё-что переписать в Starling. Приведу здесь код простейшего воркера очереди.

require 'starling'
require 'php_serialize'

class Starling
 def get_raw(key, raw = false)
 with_server(key) do |server, cache_key|
 logger.debug { "get #{key} from #{server.inspect}" } if logger
 value = cache_get server, cache_key
 return nil if value.nil?
 value = PHP.unserialize value unless raw
 return value
 end
 rescue TypeError => err
 handle_error nil, err
 end
end
# --------------------------------------------------
starling = Starling.new('0.0.0.0:22122')

puts starling.sizeof('myqueue1')
puts starling.get_raw('myqueue1')

exit

Как видите, вместо Marshal.load из MemCache::get и использую PHP::unserialize и пропустив один уровень наследования (MemCache) для того, чтобы не поломать обычный MemCache клиент я внедрил его в класс Starling. Отлчино, теперь примемся за вторую часть нашей системы.

PHP + Starling (ZendExtra_Queue_Adapter_Starling)

В Zend Framework уже есть неплохой класс работы с очередями — Zend_Queue, но к сожалению он не поддерживает Starling. Для внедрения поддержки создадим наш адаптер, который отнаследуем от ближайшего родственника (Zend_Queue_Adapter_MemcacheQ).

В Зенд-Кью (а именно так читается Zend_Queue) очень хорошо реализована работа с адаптерами хранилищ. Каждый из них очень разный, и паттерн «Стратегия» тут бы не подошёл, однако авторы нашли выход и добавили функцию getCapabilities(), которая возвращает масси возможностей хранилища.

    /**
     * Supporting abilities
     *
     * @return array
     */
    public function getCapabilities()
    {
        return array(
            'create'        => true,
            'delete'        => true,
            'send'          => true,
            'receive'       => true,
            'deleteMessage' => false,
            'getQueues'     => true,
            'count'         => true,
            'isExists'      => true,
        );
    }

Для нашего хранилища есть возможность получить количество сообщений в очереди на обслуживание, но его надо реализовать. В написал/переписал/дописал часть функций адаптера в результате родился
ZendExtra_Queue_Adapter_Starling

<?php

/**
 * Adapter for storage messags in Starling queue
 * Note: messages are stored serialized
 * After recieving outside this adapter unserialization is needed
 * For Ruby you can use gem "php_serialize" (not "php-serialize")
 */
require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
class ZendExtra_Queue_Adapter_Starling extends Zend_Queue_Adapter_Memcacheq
{
 const DEFAULT_HOST = '127.0.0.1';
 const DEFAULT_PORT = 22122;
 const EOL          = "\r\n";

 /**
 * Supporting abilities
 *
 * @return array
 */
 public function getCapabilities()
 {
 return array(
 'create'        => true,
 'delete'        => true,
 'send'          => true,
 'receive'       => true,
 'deleteMessage' => false,
 'getQueues'     => true,
 'count'         => true,
 'isExists'      => true,
 );
 }

 public function __construct($options, Zend_Queue $queue = null)
 {
 if (!extension_loaded('memcache')) {
 require_once 'Zend/Queue/Exception.php';
 throw new Zend_Queue_Exception('Memcache extension does not appear to be loaded');
 }

 Zend_Queue_Adapter_AdapterAbstract::__construct($options, $queue); // instead of parent::, which is MemcachedQ

 $options = &$this->_options['driverOptions'];

 if (!array_key_exists('host', $options)) {
 $options['host'] = self::DEFAULT_HOST;
 }
 if (!array_key_exists('port', $options)) {
 $options['port'] = self::DEFAULT_PORT;
 }

 $this->_cache = new Memcache();

 $result = $this->_cache->connect($options['host'], $options['port']);

 if ($result === false) {
 require_once 'Zend/Queue/Exception.php';
 throw new Zend_Queue_Exception('Could not connect to MemcacheQ');
 }

 $this->_host = $options['host'];
 $this->_port = (int)$options['port'];
 }

 /**
 * Return queues list
 *
 * @return array
 */
 public function getQueues()
 {
 $this->_queues = array();

 $response = $this->_sendCommand('stats', array('END'));

 $postfixesArray = array('_items', '_total_items', '_logsize', '_expired_items', '_age', '_total', '_expired');
 $arr = array();
 foreach ($response as $i => $line) {
 if(strpos($line, 'STAT queue_') === 0 ) { // Zero position (start), not false
 $tmp = str_replace('STAT queue_', '', $line);
 $tmp = explode(' ', $tmp);
 $tmp = $tmp[0];
 foreach($postfixesArray as $postfix) {
 $tmp = str_replace($postfix, '', $tmp);
 }
 $arr[] = $tmp;
 }
 }
 $this->_queues = array_unique($arr);
 return $this->_queues;
 }

 /**
 * Return the approximate number of messages in the queue
 *
 * @return integer
 * @throws Zend_Queue_Exception (not supported)
 */
 public function count(Zend_Queue $queue = null)
 {
 if($queue === NULL) {
 $keyName = 'queue_'.$this->getQueue()->getName().'_items';
 } else {
 $keyName = 'queue_'.$queue->getName().'_items';
 }
 $response = $this->_sendCommand('stats', array('END'));
 foreach($response as $line) {
 if(strpos($line, $keyName) !== false) {
 $tmp = str_replace('STAT '.$keyName.' ', '', $line);
 return (int) $tmp;
 }
 }
 return 0;
 }

 /**
 * Get messages in the queue
 *
 * @param  integer    $maxMessages  Maximum number of messages to return
 * @param  integer    $timeout      Visibility timeout for these messages
 * @param  Zend_Queue $queue
 * @return Zend_Queue_Message_Iterator
 * @throws Zend_Queue_Exception
 */
 public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
 {
 if ($maxMessages === null) {
 $maxMessages = 1;
 }

 if ($timeout === null) {
 $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
 }
 if ($queue === null) {
 $queue = $this->_queue;
 }

 $msgs = array();

 // Setting up the limit upon to queue count
 if($maxMessages != NULL) {
 $count = $this->count($queue);
 if($count < $maxMessages)
 $maxMessages = $count;
 }

 if ($maxMessages > 0 ) {
 for ($i = 0; $i < $maxMessages; $i++) {
 $data = unserialize($this->_cache->get($queue->getName()));
 $msgs[] = $data;
 }
 }

 $options = array(
 'queue'        => $queue,
 'data'         => $msgs,
 'messageClass' => $queue->getMessageClass(),
 );
 $classname = $queue->getMessageSetClass();
 if (!class_exists($classname)) {
 require_once 'Zend/Loader.php';
 Zend_Loader::loadClass($classname);
 }
 return new $classname($options);
 }

 /**
 * Delete a queue and all of it's messages
 *
 * Returns false if the queue is not found, true if the queue exists
 *
 * @param  string  $name queue name
 * @return boolean
 * @throws Zend_Queue_Exception
 */
 public function delete($name)
 {
 $queue = $this->getQueue($name);
 $count = $this->count($queue);
 $this->receive($count, NULL, $queue);
 }

 /**
 * Send a message to the queue
 *
 * @param  string     $message Message to send to the active queue
 * @param  Zend_Queue $queue
 * @return Zend_Queue_Message
 * @throws Zend_Queue_Exception
 */
 public function send($message, Zend_Queue $queue=null)
 {
 if ($queue === null) {
 $queue = $this->_queue;
 }

 if (!$this->isExists($queue->getName())) {
 require_once 'Zend/Queue/Exception.php';
 throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
 }

 $message = serialize($message);
 $data    = array(
 'message_id' => md5(uniqid(rand(), true)),
 'handle'     => null,
 'body'       => $message,
 'md5'        => md5($message),
 );

 $result = $this->_cache->set($queue->getName(), $message, 0, 0);
 if ($result === false) {
 require_once 'Zend/Queue/Exception.php';
 throw new Zend_Queue_Exception('failed to insert message into queue:' . $queue->getName());
 }

 $options = array(
 'queue' => $queue,
 'data'  => $data,
 );

 $classname = $queue->getMessageClass();
 if (!class_exists($classname)) {
 require_once 'Zend/Loader.php';
 Zend_Loader::loadClass($classname);
 }
 return new $classname($options);
 }

}

А вот и пример инициализации класса и работы с ним:
StarlingZF.php

$queuePool = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
$queuePool->send(array('domain' => 'site.ru'));
var_dump($queuePool->receive()->toArray()); // Messages also deleting from queue

Или тоже самое но на уровне PHP:
StarlingPHP.php

connect('localhost', 22122) or die ("Could not connect");
$data = array('domain' => 'test2.ru');
$memcache->set('myqueue1', serialize($data)) or die ("Failed to save data at the server");
print 'ok';

Как видите всё не так сложно. Теперь можно даже написать утилиту мониторинга (если вам не подходят существующие), которая будет следить за очередью.

Преимущества и недостатки Starling

Давайте посмотрим, что у нас получилось, и что нам это даёт.

  • Скорость. Данные хранятся в Memcache, который славится своей производительностью.
  • Масштабируемость. Очередь может быть распределена на несколько серверов.
  • Стандартизированность и унификация. Используется стандартный Memcached-протокол.

Но есть и недостатки.

  • Надежность. Т.к. все данные хранятся в памяти, то при перезапуске сервера задачи — теряются.

Но в некоторых случаях это может быть не критично. Например в случае хостера отметка о завершении задачи ставится в конце неё, и при необходимости клиентская часть сможет повторить запрос. В случае отправки email -есть внешняя очередь Postfix, куда ставятся сообщения на отправку, а Starling является буферной очередью для почтового сервера. В случае ресайза картинок, задача на ресайз может быть поставлена при повторном обращении ккартинке по проверки условия выполнения задачи. Вот, что пишут про неё сами разработчики.

Система надёжна, быстра и использует стандартный протокол memcached. По заверениям разработчиков, это вообще самое стабильное звено твиттера. Когда другие элементы системы отключаются, Starling всегда продолжает работать.

Так что не всё так плохо, важно понимать, что идеальных решений не существует и любое из них компромисс в ту или иную сторону и определитсья с приоритетами. Если приоритет — Highload, то, думаю, сквоец — отлчиное решение!

Литература

http://www.aagh.net/projects/ruby-php-serialize
http://rubypond.com/blog/the-complete-guide-to-setting-up-starling
https://github.com/highgroove/scout-plugins/raw/master/starling_monitor/starling_monitor.rb
https://github.com/starling/starling
http://rubyjunction.us/ruby-asynchronous-messaging
http://habrahabr.ru/blogs/hi/44907/
http://habrahabr.ru/blogs/hi/45891/
ZF Proposal




coded by nessus
Share















Смотрите также:

Спасибо!


Если вам помогла статья, или вы хотите поддержать мои исследования и блог - вот лучший способ сделать это:


9 Responses to “Ruby & PHP. Скрещиваем ужа с ежом с помощью Starling и Zend_Queue.”

  1. Fm55:

    Автору спасобо, давно искал что то подобное но php + Ruby это лучший вариант.

  2. Сергий:

    Мсье знает толк в извращениях))
    Пара вопросов:
    1) Зачем было строить Zoo? На клиенте рельсы, было бы проще
    2) Если так нужен был ZF почему бы не смотреть на другие решения, типа RabbitMQ — там есть хорошие реализации и примеры на разные языки.
    3) Для простых задач, не смотрели Delayed Job на ruby?

    • google.com Андрей Токарчук:

      Это был 2011, мы выживали, как могли) А если серьезно, то:
      1) Зоопарк уже был, был и демон на Ruby и приложение на ZF. Оставалось выбрать очередь.
      2) Эта показалась наиболее простой, да так в принципе и есть. А главное есть клиенты под все языки. По этому и выбрал. Из-за интерфейса.
      3) Смотрел, но клиент то у меня на ZF. Надо оттуда задачи создавать.
      Кстати сейчас реализую подобную задачу. Уже совершенно по другому. На основе аатомарных обновлений в MongoDB. Клиенты (Symfony2 приложение) и сервер (PHP libevent демон очереди) будут юзать монгу для её хранения.

      • Сергий:

        Ясно все с вами) Выживать это хорошо. Да и очереди не плохо наверное, хотя мне пока delayed’a за глаза. Но смотрю на rabbitmq.
        А вот про MongoDB можно подробнее? Вернее как это там выглядит (в документацию лезть не хочу пока). Я рельсовик, поэтому не знаю что такое libevent…могу предположить, что это что-то вроде нашего EventMachine ?

        • google.com Андрей Токарчук:

          Ага, насколько я знаю EventMachine это такая же обертка для системной Libev/libevent как и расширение php_libevent. Про монгу потом подробнее напишу отдельную статью. Основная идея в findAndModify см. http://www.slideshare.net/mongodb/mongodb-as-message-queue Там кстати по тестам такая штука оказалась быстрее, чем RabbitMQ :-)

  3. Сергий:

    Хм, а я решил познать python для работы с демонами и фоновыми процессами. Все-так если использовать сервисы сообщений, то можно извращаться над разными технологиями, а системные утилитки писать на python сподручнее будет (да простит меня любимый ruby)…А разве в php нету реализации Delaed Job ? А монго для меня пока закрытая книга, несмотря на то, что я ее сдуру подключил к одному проекту (решил поизвращаться) но выяснилось, что мне возможностей РСУБД хвататает. Наверное проблема в том, что я сразу залез в нее через ODM Mongoid (это реализация ActiveRecord под MongoDB) и там это все настолько похоже, что прям удобней не бывает…поэтому некоторые мощный возможности все-же скрыты за «магией»…

    • google.com Андрей Токарчук:

      Да, Сергей, Mongoid говорят весьма и весьма неплох. Насчёт магии ODM — это точно. Не зная, что там ниже лежит можно всякую фигню понаписать. Но нам хорошо, юзаем профайлер от Sf2 тулбара, он нам все-все-все запросы показывает и косяки выплывают, если есть. А по поводу языка для системных утилит, ну если потечет вдруг демон я его просто убиваю (по лимиту потребляемой памяти), а supervisord палит это дело и сразу перезапускает. Очередь — persistant, так что это ничего страшного. Что до Delaed Job — то как раз пишу сейчас подобную штуку.

      • Сергий:

        Я бы не сказал, что там можно фигню написать…в общем то, как раз, фигню написать и не получится, он там все по полкам раскидывает, как и AD. С другой стороны дальше «простых вещей» оттуда вряд ли можно добраться. Меня в Монго изначально подкупила их возможность на лету создавать атрибуты, что бы не париться с EAV моделью данных…

        • google.com Андрей Токарчук:

          Ну дык schemaless во всей красе. Правда больше обработки в приложении надо делать, но это ничего. Бекэндов можно много понаделать.

Комментировать