Return to Snippet

Revision: 66967
at July 26, 2014 05:31 by ryantxr


Initial Code
<?php
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitQueue 
{
    public $rabbit;

    public function connect($server = RABBIT_SERVER, $port = RABBIT_PORT, $user = RABBIT_USER, $pass = RABBIT_PASS, $vhost = RABBIT_VHOST) {
        // TODO - this can be cleaned up once Cake supports namespaces...
        $path = APP . 'Vendor' . DS . 'php-amqplib-master' . DS  . 'PhpAmqpLib' . DS . 'Connection' . DS;
        $channel = APP . 'Vendor' . DS . 'php-amqplib-master' . DS  . 'PhpAmqpLib' . DS . 'Channel' . DS;
        $wireio = APP . 'Vendor' . DS . 'php-amqplib-master' . DS  . 'PhpAmqpLib' . DS . 'Wire' . DS . 'IO' . DS;
        $wire = APP . 'Vendor' . DS . 'php-amqplib-master' . DS  . 'PhpAmqpLib' . DS . 'Wire' . DS;
        $helper = APP . 'Vendor' . DS . 'php-amqplib-master' . DS  . 'PhpAmqpLib' . DS . 'Helper' . DS;
        $helperprotocol = APP . 'Vendor' . DS . 'php-amqplib-master' . DS  . 'PhpAmqpLib' . DS . 'Helper' . DS . 'Protocol' . DS;
        $exception = APP . 'Vendor' . DS . 'php-amqplib-master' . DS  . 'PhpAmqpLib' . DS . 'Exception' . DS;
        $message = APP . 'Vendor' . DS . 'php-amqplib-master' . DS  . 'PhpAmqpLib' . DS . 'Message' . DS;
        include_once($exception . 'AMQPExceptionInterface.php');
        include_once($exception . 'AMQPRuntimeException.php');
        include_once($exception . 'AMQPProtocolException.php');
        include_once($exception . 'AMQPProtocolChannelException.php');
        include_once($wire . 'GenericContent.php');
        include_once($message . 'AMQPMessage.php');

        include_once($helperprotocol . 'MethodMap091.php');
        include_once($helperprotocol . 'Wait091.php');
        include_once($helperprotocol . 'Protocol091.php');
        include_once($wire . 'Constants091.php');
        include_once($helper . 'MiscHelper.php');
        include_once($wire . 'AMQPWriter.php');
        include_once($wire . 'AMQPReader.php');
        include_once($wireio . 'AbstractIO.php');
        include_once($wireio . 'StreamIO.php');
        include_once($channel . 'AbstractChannel.php');
        include_once($channel . 'AMQPChannel.php');
        include_once($path . 'AbstractConnection.php');
        include_once($path . 'AMQPStreamConnection.php');
        include_once($path . 'AMQPConnection.php');
        
        $this->server = $server; $this->port = $port; $this->user = $user; $this->pass = $pass; $this->vhost = $vhost;
        $this->rabbit = new AMQPConnection($server, $port, $user, $pass, $vhost);
    }

    public function consumeSetup($queueName, $callback, $exchangeName = RABBIT_ADAM_EXCHANGE) {
        $immediateChannel = $this->rabbit->channel();
        $immediateChannel->queue_declare($queueName, false, true, false, false);
        $immediateChannel->exchange_declare($exchangeName, 'direct', false, true, false);
        $immediateChannel->queue_bind($queueName, $exchangeName, $queueName);
        $immediateChannel->basic_consume($queueName, 'consumer122', false, false, false, false, $callback);
        return $immediateChannel;
    }

    public function delete($message) {
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }

    public function getJob($message) {
        return unserialize($message->body);
    }

    public function publish($queueName, $job, $delay, $exchangeName = RABBIT_ADAM_EXCHANGE, $options=array()) {
        $immediateChannel = $this->rabbit->channel();

        // class AMQPChannel ...
        // function queue_declare(
        // @param $queue = ""
        // @param $passive = false
        // @param $durable = false
        // @param $exclusive = false
        // @param $auto_delete = true
        // @param $nowait = false
        // @param $arguments = null
        // @param $ticket = null
        $immediateChannel->queue_declare($queueName, false, true, false, false);

        // function exchange_declare(
        // @param $exchange
        // @param $type
        // @param $passive = false
        // @param $durable = false
        // @param $auto_delete = true
        // @param $internal = false
        // @param $nowait = false
        // @param $arguments = null
        // @param $ticket = null
        $immediateChannel->exchange_declare($exchangeName, 'direct', false, true, false);

        // function queue_bind(
        // @param $queue
        // @param $exchange
        // @param $routing_key = ""
        // @param $nowait = false
        // @param $arguments = null
        // @param $ticket = null
        $immediateChannel->queue_bind($queueName, $exchangeName, $queueName);

        // node consumes non serialize data while we consume it serialized
        // that is why the following logic is needed.
        switch($tube){
            case \TYPES\JOB_TUBE_TYPE::ADAM_REPORTS:
            case \TYPES\JOB_TUBE_TYPE::CREATE_UPDATE_REPORTS:
            case \TYPES\JOB_TUBE_TYPE::PREPARE_REPORT_DB:
                $jobPayload = $job;
                break;
            default:
                $jobPayload = serialize($job);
        }
        
        // If we have a delay, then set up a dead letter exchange/queue
        if ( $delay ){
            ///$delayRabbit = new AMQPConnection($this->server, $this->port, $this->user, $this->pass, $this->vhost);

            $delayChannel = $immediateChannel;//$delayRabbit->channel();
            // CakeLog::debug('Setup a delay queue');
            if ( isset($options['wait-queue'])){
                $queueNameDelayed = $options['wait-queue'];
                $exchangeNameDelayed = $queueNameDelayed . '_EXCHANGE';
            }
            else{
                $queueNameDelayed = $queueName . '_DELAYED';
                $exchangeNameDelayed = $exchangeName . '_DELAYED';
            }
            $arguments = array(
                "x-dead-letter-exchange" => array('S', $exchangeName),
                "x-dead-letter-routing-key" => array('S', $queueName),
                "x-message-ttl" => array('I', 3600000),// 1 hour
                // "x-expires" => array("I", 6000)
                );
            // CakeLog::debug('Creating queue '.$queueNameDelayed);
            $delayChannel->queue_declare($queueNameDelayed, false, true, false, false, false, $arguments);
            $delayChannel->exchange_declare($exchangeNameDelayed, 'direct', false, true, false);
            $delayChannel->queue_bind($queueNameDelayed, $exchangeNameDelayed, $queueNameDelayed);
            //CakeLog::debug(sprintf('Delayed queue %s delay=%d', $queueNameDelayed, $delay));
        }

        $msg = new \PhpAmqpLib\Message\AMQPMessage($jobPayload, array('content_type' => 'text/plain', 'delivery_mode' => 2, "expiration" => $delay));

        // function basic_publish
        // @param AMQPMessage $msg
        // @param string $exchange = ""
        // @param string $routing_key = ""
        // @param bool $mandatory = false
        // @param bool $immediate = false
        // @param null $ticket = null
        if ( $delay ){
            //CakeLog::debug('Publish to '.$queueNameDelayed);
            $delayChannel->basic_publish($msg, $exchangeNameDelayed, $queueNameDelayed);
        }
        else{
            //CakeLog::debug('Publish to '.$queueName);
            $immediateChannel->basic_publish($msg, $exchangeName, $queueName);
        }

        //CakeLog::debug($jobPayload);

        //$immediateChannel->close();
        //$conn->close();
    }
}

Initial URL

                                

Initial Description
This is a class to encapsulate rabbit mq sending and receiving.

Initial Title
RabbitMQ from PHP

Initial Tags
php

Initial Language
PHP