Revision: 66967
Initial Code
Initial URL
Initial Description
Initial Title
Initial Tags
Initial Language
at July 26, 2014 05:31 by ryantxr
Initial Code
<?php use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitQueue { public $rabbit; public function connect($server = RABBIT_SERVER, $port = RABBIT_PORT, $user = RABBIT_USER, $pass = RABBIT_PASS, $vhost = RABBIT_VHOST) { // TODO - this can be cleaned up once Cake supports namespaces... $path = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Connection' . DS; $channel = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Channel' . DS; $wireio = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Wire' . DS . 'IO' . DS; $wire = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Wire' . DS; $helper = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Helper' . DS; $helperprotocol = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Helper' . DS . 'Protocol' . DS; $exception = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Exception' . DS; $message = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Message' . DS; include_once($exception . 'AMQPExceptionInterface.php'); include_once($exception . 'AMQPRuntimeException.php'); include_once($exception . 'AMQPProtocolException.php'); include_once($exception . 'AMQPProtocolChannelException.php'); include_once($wire . 'GenericContent.php'); include_once($message . 'AMQPMessage.php'); include_once($helperprotocol . 'MethodMap091.php'); include_once($helperprotocol . 'Wait091.php'); include_once($helperprotocol . 'Protocol091.php'); include_once($wire . 'Constants091.php'); include_once($helper . 'MiscHelper.php'); include_once($wire . 'AMQPWriter.php'); include_once($wire . 'AMQPReader.php'); include_once($wireio . 'AbstractIO.php'); include_once($wireio . 'StreamIO.php'); include_once($channel . 'AbstractChannel.php'); include_once($channel . 'AMQPChannel.php'); include_once($path . 'AbstractConnection.php'); include_once($path . 'AMQPStreamConnection.php'); include_once($path . 'AMQPConnection.php'); $this->server = $server; $this->port = $port; $this->user = $user; $this->pass = $pass; $this->vhost = $vhost; $this->rabbit = new AMQPConnection($server, $port, $user, $pass, $vhost); } public function consumeSetup($queueName, $callback, $exchangeName = RABBIT_ADAM_EXCHANGE) { $immediateChannel = $this->rabbit->channel(); $immediateChannel->queue_declare($queueName, false, true, false, false); $immediateChannel->exchange_declare($exchangeName, 'direct', false, true, false); $immediateChannel->queue_bind($queueName, $exchangeName, $queueName); $immediateChannel->basic_consume($queueName, 'consumer122', false, false, false, false, $callback); return $immediateChannel; } public function delete($message) { $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } public function getJob($message) { return unserialize($message->body); } public function publish($queueName, $job, $delay, $exchangeName = RABBIT_ADAM_EXCHANGE, $options=array()) { $immediateChannel = $this->rabbit->channel(); // class AMQPChannel ... // function queue_declare( // @param $queue = "" // @param $passive = false // @param $durable = false // @param $exclusive = false // @param $auto_delete = true // @param $nowait = false // @param $arguments = null // @param $ticket = null $immediateChannel->queue_declare($queueName, false, true, false, false); // function exchange_declare( // @param $exchange // @param $type // @param $passive = false // @param $durable = false // @param $auto_delete = true // @param $internal = false // @param $nowait = false // @param $arguments = null // @param $ticket = null $immediateChannel->exchange_declare($exchangeName, 'direct', false, true, false); // function queue_bind( // @param $queue // @param $exchange // @param $routing_key = "" // @param $nowait = false // @param $arguments = null // @param $ticket = null $immediateChannel->queue_bind($queueName, $exchangeName, $queueName); // node consumes non serialize data while we consume it serialized // that is why the following logic is needed. switch($tube){ case \TYPES\JOB_TUBE_TYPE::ADAM_REPORTS: case \TYPES\JOB_TUBE_TYPE::CREATE_UPDATE_REPORTS: case \TYPES\JOB_TUBE_TYPE::PREPARE_REPORT_DB: $jobPayload = $job; break; default: $jobPayload = serialize($job); } // If we have a delay, then set up a dead letter exchange/queue if ( $delay ){ ///$delayRabbit = new AMQPConnection($this->server, $this->port, $this->user, $this->pass, $this->vhost); $delayChannel = $immediateChannel;//$delayRabbit->channel(); // CakeLog::debug('Setup a delay queue'); if ( isset($options['wait-queue'])){ $queueNameDelayed = $options['wait-queue']; $exchangeNameDelayed = $queueNameDelayed . '_EXCHANGE'; } else{ $queueNameDelayed = $queueName . '_DELAYED'; $exchangeNameDelayed = $exchangeName . '_DELAYED'; } $arguments = array( "x-dead-letter-exchange" => array('S', $exchangeName), "x-dead-letter-routing-key" => array('S', $queueName), "x-message-ttl" => array('I', 3600000),// 1 hour // "x-expires" => array("I", 6000) ); // CakeLog::debug('Creating queue '.$queueNameDelayed); $delayChannel->queue_declare($queueNameDelayed, false, true, false, false, false, $arguments); $delayChannel->exchange_declare($exchangeNameDelayed, 'direct', false, true, false); $delayChannel->queue_bind($queueNameDelayed, $exchangeNameDelayed, $queueNameDelayed); //CakeLog::debug(sprintf('Delayed queue %s delay=%d', $queueNameDelayed, $delay)); } $msg = new \PhpAmqpLib\Message\AMQPMessage($jobPayload, array('content_type' => 'text/plain', 'delivery_mode' => 2, "expiration" => $delay)); // function basic_publish // @param AMQPMessage $msg // @param string $exchange = "" // @param string $routing_key = "" // @param bool $mandatory = false // @param bool $immediate = false // @param null $ticket = null if ( $delay ){ //CakeLog::debug('Publish to '.$queueNameDelayed); $delayChannel->basic_publish($msg, $exchangeNameDelayed, $queueNameDelayed); } else{ //CakeLog::debug('Publish to '.$queueName); $immediateChannel->basic_publish($msg, $exchangeName, $queueName); } //CakeLog::debug($jobPayload); //$immediateChannel->close(); //$conn->close(); } }
Initial URL
Initial Description
This is a class to encapsulate rabbit mq sending and receiving.
Initial Title
RabbitMQ from PHP
Initial Tags
php
Initial Language
PHP