Ruby & PHP. Скрещиваем ужа с ежом с помощью Starling и Zend_Queue.
В этой статье я расскажу о животрепещущем для многих вопросе. Как соединить между собой приложения на разных языках. Например, Ruby и PHP. В Twitter проблему интеграции с очередью решили с помощью Starling. Вообще сейчас намечается тенденция, что для каждой задачи подбирают свой язык. Гомогенных систем становится всё меньше. В следствии этого возникает потребность в стандартах на интеграцию разношерстного ПО в единую систему.
Постановка задачи
Допустим у нас есть ситуация, при которой во время какого-либо HTTP запроса необходимо сделать длительную операцию. Например, при добавлении пользователем комментария к статье отправить email уведомление её автору. В классической схеме взаимодействия (рис 7-1) мы вынуждены ждать завершения этой операции, чтобы уведомить пользователя. Получается что эта операция блокирует выполнение веб-приложения, а значит происходит потеря производительности. Да и пользователю приходится ждать лишнее время. Когда количество пользователей невелико – это не проблема, но в Highload проектах этот вопрос приобретает большое значение. Мы определились с задачей – вынести все ресурсоемкие задачи в фоновые процессы. Рассмотрим случай с хостингом и добавлением домена в веб-панели. Задачей будет выполнение скрипта на Ruby, который будет производить манипуляции с DNS зонами и перезагружать DNS сервис.
Теперь определимся с схемой работы. Клиентом очереди будет ZendFramework приложение, которое будет добавлять задачи в очередь (отправлять сообщения в очередь). Сервером (воркером) будет Ruby, который будет принимать сообщения из очереди и обрабатывать их.
Почему выбрана именно такая комбинация? ZF для клиента исторически сложился, а вот сервер на Ruby выбран, т.к. очень не хочется давать PHP права root, думаю многие админы меня поймут. Выбор очереди был сделан в пользу Starling, т.к. она основана на Memcached и использует его протокол (ну почти 🙂 ) и реализация на руби предельно проста.
Что такое очереди сообщений
“А как вы организуете очередь?
– С помощью бабушек!”
с Хабрахабра
Тема очередей сообщений была навеяна докладом “Разделение труда: Организация многозадачной, распределенной системы в Zend Framework с помощью Job Queue” Александра Готгельфа на последнем ZFCONF. Он работал с сервером очередей Gearman, и рассказал о его немногочисленных, но очень серьезных багах (таких, как проблема освобождения памяти). В связи с этим я начал смотреть в торону других и остановился на MemcacheQ и Starling, который очень советовали пользователи хабра. А т.к. имплементация на руби проще со вторым он и был выбран.
Что такое Starling?
Starling – это скворец, певчая птица семейства скворцовых, широко распространённая на значительной территории Евразии, а также успешно интродуцированная в Южную Африку, Северную Америку, Австралию и Новую Зеландию. На юге и западе Европы ведёт оседлый образ жизни, а в северной и восточной её части является перелётной, в зимние месяцы мигрируя на юг. Внешне (размерами, желтым клювом и темным оперением) слегка напоминает чёрных дроздов, но в отличие от них ходит по земле, а не прыгает. 🙂
А ещё так называется очередь сообщений, написанная на Ruby. Её используют в Twitter! Для установки сервера очередей у вас уже должен быть установлен Ruby. Сам же Starling ставится очень просто. Следующая строка выполняется только один раз!
1 |
gem sources -a http://gems.github.com/ |
Ставим гем (можно делать сколько угодно раз).
1 |
sudo gem install starling |
Если вы ставили руби для всех юзеров, то выполняем вместо sudo rvmsudo
Дальше надо запустить сервер и можно коннектить серверов и клиентов.
1 2 3 |
andrey@comp$ starling Starting at 127.0.0.1:22122. I, [2011-05-20T10:53:34.346297 #6017] INFO -- : Starling STARTUP on 127.0.0.1:22122 |
Клиентами будем называть тех, кто отправляет задания в очередь, а серверами – тех кто получает и обслуживает их (to serve). Теперь можно подключать клиентов на руби и php.
Общий формат данных. Ruby маршализация в PHP
Первое с чем я столкнулся это формат хранения сообщений в очереди. На выбор было несколько вариантов.
- [б] Маршализация (marshalling) объектов. Так делает руби и это дефолтовый способ работы Starling,
- [т] Сериализация (serializing). Так хранит объекты Zend Framework, а именно Zend_Queue,
- [т] JSON. Стандарт де-факто для взаимодействия в гетерогенных средах,
- [т] XML. Стандарт де-юро, однако имеет больший расход памяти для хранения сообщений, т.к. имеет теговую природу,
где [т] – текстовый формат, [б] -бинарный формат.
Скажу так, в начале у меня был соблазн вообще не трогать Starling, а научиться в PHP работать с маршализованными руби объектами. Но ничего похожего я найти не смог. Использовать XML не хотелось, для работы с JSON надо было бы здорово переписывать сервер очередей. Вот тогда то мне на глаза и попался гем php_serialize.
Ruby + Starling
Делает он ровно то, что я от него и ожидал, а именно выполняет serialize() функцию над Ruby объектами и наоборот. Итак схема такова: php -> serialize -> starling -> unserialize -> ruby. Для этого правда всё равно пришлось коё-что переписать в Starling. Приведу здесь код простейшего воркера очереди.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
require 'starling' require 'php_serialize' class Starling def get_raw(key, raw = false) with_server(key) do |server, cache_key| logger.debug { "get #{key} from #{server.inspect}" } if logger value = cache_get server, cache_key return nil if value.nil? value = PHP.unserialize value unless raw return value end rescue TypeError => err handle_error nil, err end end # -------------------------------------------------- starling = Starling.new('0.0.0.0:22122') puts starling.sizeof('myqueue1') puts starling.get_raw('myqueue1') exit |
Как видите, вместо Marshal.load из MemCache::get и использую PHP::unserialize и пропустив один уровень наследования (MemCache) для того, чтобы не поломать обычный MemCache клиент я внедрил его в класс Starling. Отлчино, теперь примемся за вторую часть нашей системы.
PHP + Starling (ZendExtra_Queue_Adapter_Starling)
В Zend Framework уже есть неплохой класс работы с очередями – Zend_Queue, но к сожалению он не поддерживает Starling. Для внедрения поддержки создадим наш адаптер, который отнаследуем от ближайшего родственника (Zend_Queue_Adapter_MemcacheQ).
В Зенд-Кью (а именно так читается Zend_Queue) очень хорошо реализована работа с адаптерами хранилищ. Каждый из них очень разный, и паттерн “Стратегия” тут бы не подошёл, однако авторы нашли выход и добавили функцию getCapabilities(), которая возвращает масси возможностей хранилища.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
/** * Supporting abilities * * @return array */ public function getCapabilities() { return array( 'create' => true, 'delete' => true, 'send' => true, 'receive' => true, 'deleteMessage' => false, 'getQueues' => true, 'count' => true, 'isExists' => true, ); } |
Для нашего хранилища есть возможность получить количество сообщений в очереди на обслуживание, но его надо реализовать. В написал/переписал/дописал часть функций адаптера в результате родился
ZendExtra_Queue_Adapter_Starling
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
<?php /** * Adapter for storage messags in Starling queue * Note: messages are stored serialized * After recieving outside this adapter unserialization is needed * For Ruby you can use gem "php_serialize" (not "php-serialize") */ require_once 'Zend/Queue/Adapter/AdapterAbstract.php'; class ZendExtra_Queue_Adapter_Starling extends Zend_Queue_Adapter_Memcacheq { const DEFAULT_HOST = '127.0.0.1'; const DEFAULT_PORT = 22122; const EOL = "\r\n"; /** * Supporting abilities * * @return array */ public function getCapabilities() { return array( 'create' => true, 'delete' => true, 'send' => true, 'receive' => true, 'deleteMessage' => false, 'getQueues' => true, 'count' => true, 'isExists' => true, ); } public function __construct($options, Zend_Queue $queue = null) { if (!extension_loaded('memcache')) { require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception('Memcache extension does not appear to be loaded'); } Zend_Queue_Adapter_AdapterAbstract::__construct($options, $queue); // instead of parent::, which is MemcachedQ $options = &$this->_options['driverOptions']; if (!array_key_exists('host', $options)) { $options['host'] = self::DEFAULT_HOST; } if (!array_key_exists('port', $options)) { $options['port'] = self::DEFAULT_PORT; } $this->_cache = new Memcache(); $result = $this->_cache->connect($options['host'], $options['port']); if ($result === false) { require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception('Could not connect to MemcacheQ'); } $this->_host = $options['host']; $this->_port = (int)$options['port']; } /** * Return queues list * * @return array */ public function getQueues() { $this->_queues = array(); $response = $this->_sendCommand('stats', array('END')); $postfixesArray = array('_items', '_total_items', '_logsize', '_expired_items', '_age', '_total', '_expired'); $arr = array(); foreach ($response as $i => $line) { if(strpos($line, 'STAT queue_') === 0 ) { // Zero position (start), not false $tmp = str_replace('STAT queue_', '', $line); $tmp = explode(' ', $tmp); $tmp = $tmp[0]; foreach($postfixesArray as $postfix) { $tmp = str_replace($postfix, '', $tmp); } $arr[] = $tmp; } } $this->_queues = array_unique($arr); return $this->_queues; } /** * Return the approximate number of messages in the queue * * @return integer * @throws Zend_Queue_Exception (not supported) */ public function count(Zend_Queue $queue = null) { if($queue === NULL) { $keyName = 'queue_'.$this->getQueue()->getName().'_items'; } else { $keyName = 'queue_'.$queue->getName().'_items'; } $response = $this->_sendCommand('stats', array('END')); foreach($response as $line) { if(strpos($line, $keyName) !== false) { $tmp = str_replace('STAT '.$keyName.' ', '', $line); return (int) $tmp; } } return 0; } /** * Get messages in the queue * * @param integer $maxMessages Maximum number of messages to return * @param integer $timeout Visibility timeout for these messages * @param Zend_Queue $queue * @return Zend_Queue_Message_Iterator * @throws Zend_Queue_Exception */ public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null) { if ($maxMessages === null) { $maxMessages = 1; } if ($timeout === null) { $timeout = self::RECEIVE_TIMEOUT_DEFAULT; } if ($queue === null) { $queue = $this->_queue; } $msgs = array(); // Setting up the limit upon to queue count if($maxMessages != NULL) { $count = $this->count($queue); if($count < $maxMessages) $maxMessages = $count; } if ($maxMessages > 0 ) { for ($i = 0; $i < $maxMessages; $i++) { $data = unserialize($this->_cache->get($queue->getName())); $msgs[] = $data; } } $options = array( 'queue' => $queue, 'data' => $msgs, 'messageClass' => $queue->getMessageClass(), ); $classname = $queue->getMessageSetClass(); if (!class_exists($classname)) { require_once 'Zend/Loader.php'; Zend_Loader::loadClass($classname); } return new $classname($options); } /** * Delete a queue and all of it's messages * * Returns false if the queue is not found, true if the queue exists * * @param string $name queue name * @return boolean * @throws Zend_Queue_Exception */ public function delete($name) { $queue = $this->getQueue($name); $count = $this->count($queue); $this->receive($count, NULL, $queue); } /** * Send a message to the queue * * @param string $message Message to send to the active queue * @param Zend_Queue $queue * @return Zend_Queue_Message * @throws Zend_Queue_Exception */ public function send($message, Zend_Queue $queue=null) { if ($queue === null) { $queue = $this->_queue; } if (!$this->isExists($queue->getName())) { require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName()); } $message = serialize($message); $data = array( 'message_id' => md5(uniqid(rand(), true)), 'handle' => null, 'body' => $message, 'md5' => md5($message), ); $result = $this->_cache->set($queue->getName(), $message, 0, 0); if ($result === false) { require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception('failed to insert message into queue:' . $queue->getName()); } $options = array( 'queue' => $queue, 'data' => $data, ); $classname = $queue->getMessageClass(); if (!class_exists($classname)) { require_once 'Zend/Loader.php'; Zend_Loader::loadClass($classname); } return new $classname($options); } } |
А вот и пример инициализации класса и работы с ним:
StarlingZF.php
1 2 3 |
$queuePool = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1')); $queuePool->send(array('domain' => 'site.ru')); var_dump($queuePool->receive()->toArray()); // Messages also deleting from queue |
Или тоже самое но на уровне PHP:
StarlingPHP.php
1 2 3 4 |
connect('localhost', 22122) or die ("Could not connect"); $data = array('domain' => 'test2.ru'); $memcache->set('myqueue1', serialize($data)) or die ("Failed to save data at the server"); print 'ok'; |
Как видите всё не так сложно. Теперь можно даже написать утилиту мониторинга (если вам не подходят существующие), которая будет следить за очередью.
Преимущества и недостатки Starling
Давайте посмотрим, что у нас получилось, и что нам это даёт.
- Скорость. Данные хранятся в Memcache, который славится своей производительностью.
- Масштабируемость. Очередь может быть распределена на несколько серверов.
- Стандартизированность и унификация. Используется стандартный Memcached-протокол.
Но есть и недостатки.
- Надежность. Т.к. все данные хранятся в памяти, то при перезапуске сервера задачи – теряются.
Но в некоторых случаях это может быть не критично. Например в случае хостера отметка о завершении задачи ставится в конце неё, и при необходимости клиентская часть сможет повторить запрос. В случае отправки email -есть внешняя очередь Postfix, куда ставятся сообщения на отправку, а Starling является буферной очередью для почтового сервера. В случае ресайза картинок, задача на ресайз может быть поставлена при повторном обращении ккартинке по проверки условия выполнения задачи. Вот, что пишут про неё сами разработчики.
Система надёжна, быстра и использует стандартный протокол memcached. По заверениям разработчиков, это вообще самое стабильное звено твиттера. Когда другие элементы системы отключаются, Starling всегда продолжает работать.
Так что не всё так плохо, важно понимать, что идеальных решений не существует и любое из них компромисс в ту или иную сторону и определитсья с приоритетами. Если приоритет – Highload, то, думаю, сквоец – отлчиное решение!
Литература
http://www.aagh.net/projects/ruby-php-serialize
http://rubypond.com/blog/the-complete-guide-to-setting-up-starling
https://github.com/highgroove/scout-plugins/raw/master/starling_monitor/starling_monitor.rb
https://github.com/starling/starling
http://rubyjunction.us/ruby-asynchronous-messaging
http://habrahabr.ru/blogs/hi/44907/
http://habrahabr.ru/blogs/hi/45891/
ZF Proposal
Автору спасобо, давно искал что то подобное но php + Ruby это лучший вариант.
Мсье знает толк в извращениях))
Пара вопросов:
1) Зачем было строить Zoo? На клиенте рельсы, было бы проще
2) Если так нужен был ZF почему бы не смотреть на другие решения, типа RabbitMQ – там есть хорошие реализации и примеры на разные языки.
3) Для простых задач, не смотрели Delayed Job на ruby?
Это был 2011, мы выживали, как могли) А если серьезно, то:
1) Зоопарк уже был, был и демон на Ruby и приложение на ZF. Оставалось выбрать очередь.
2) Эта показалась наиболее простой, да так в принципе и есть. А главное есть клиенты под все языки. По этому и выбрал. Из-за интерфейса.
3) Смотрел, но клиент то у меня на ZF. Надо оттуда задачи создавать.
Кстати сейчас реализую подобную задачу. Уже совершенно по другому. На основе аатомарных обновлений в MongoDB. Клиенты (Symfony2 приложение) и сервер (PHP libevent демон очереди) будут юзать монгу для её хранения.
Ясно все с вами) Выживать это хорошо. Да и очереди не плохо наверное, хотя мне пока delayed’a за глаза. Но смотрю на rabbitmq.
А вот про MongoDB можно подробнее? Вернее как это там выглядит (в документацию лезть не хочу пока). Я рельсовик, поэтому не знаю что такое libevent…могу предположить, что это что-то вроде нашего EventMachine ?
Ага, насколько я знаю EventMachine это такая же обертка для системной Libev/libevent как и расширение php_libevent. Про монгу потом подробнее напишу отдельную статью. Основная идея в findAndModify см. http://www.slideshare.net/mongodb/mongodb-as-message-queue Там кстати по тестам такая штука оказалась быстрее, чем RabbitMQ 🙂
Хм, а я решил познать python для работы с демонами и фоновыми процессами. Все-так если использовать сервисы сообщений, то можно извращаться над разными технологиями, а системные утилитки писать на python сподручнее будет (да простит меня любимый ruby)…А разве в php нету реализации Delaed Job ? А монго для меня пока закрытая книга, несмотря на то, что я ее сдуру подключил к одному проекту (решил поизвращаться) но выяснилось, что мне возможностей РСУБД хвататает. Наверное проблема в том, что я сразу залез в нее через ODM Mongoid (это реализация ActiveRecord под MongoDB) и там это все настолько похоже, что прям удобней не бывает…поэтому некоторые мощный возможности все-же скрыты за “магией”…
Да, Сергей, Mongoid говорят весьма и весьма неплох. Насчёт магии ODM – это точно. Не зная, что там ниже лежит можно всякую фигню понаписать. Но нам хорошо, юзаем профайлер от Sf2 тулбара, он нам все-все-все запросы показывает и косяки выплывают, если есть. А по поводу языка для системных утилит, ну если потечет вдруг демон я его просто убиваю (по лимиту потребляемой памяти), а supervisord палит это дело и сразу перезапускает. Очередь – persistant, так что это ничего страшного. Что до Delaed Job – то как раз пишу сейчас подобную штуку.
Я бы не сказал, что там можно фигню написать…в общем то, как раз, фигню написать и не получится, он там все по полкам раскидывает, как и AD. С другой стороны дальше “простых вещей” оттуда вряд ли можно добраться. Меня в Монго изначально подкупила их возможность на лету создавать атрибуты, что бы не париться с EAV моделью данных…
Ну дык schemaless во всей красе. Правда больше обработки в приложении надо делать, но это ничего. Бекэндов можно много понаделать.