微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

mq队列Laravel集成rabbitMQ

Laravel集成rabbitMQ

1.安装扩展注意mq扩展和PHP版本laravel6和laravel7可以安装mq10

composer require vladimir-yuldashev/laravel-queue-rabbitmq "10.X" --ignore-platform-reqs

2.配置文件 queue配置文件种加入

'connections' => [
    'rabbitmq' => [
        'driver' => 'rabbitmq','queue' => env('RABBITMQ_QUEUE','default'),'connection' => PHPAmqpLib\Connection\AMQPLazyConnection::class,'hosts' => [
            [
                'host' => env('RABBITMQ_HOST','127.0.0.1'),'port' => env('RABBITMQ_PORT',5672),'user' => env('RABBITMQ_USER','guest'),'password' => env('RABBITMQ_PASSWORD','vhost' => env('RABBITMQ_VHOST','/'),],'options' => [
            'ssl_options' => [
                'cafile' => env('RABBITMQ_SSL_CAFILE',null),'local_cert' => env('RABBITMQ_SSL_LOCALCERT','local_key' => env('RABBITMQ_SSL_LOCALKEY','verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER',true),'passphrase' => env('RABBITMQ_SSL_PAsspHRASE',/*
         * Set to "horizon" if you wish to use Laravel Horizon.
         */
        'worker' => env('RABBITMQ_WORKER',]
]

3.env文件配置

QUEUE_DRIVER=rabbitmq #配置名称
RABBITMQ_HOST=容器ip地址
RABBITMQ_PORT=5672 #端口
RABBITMQ_USER=admin #mq账号
RABBITMQ_PASSWORD=admin#mq密码
RABBITMQ_VHOST=my_vhost #虚拟机的意思
RABBITMQ_QUEUE=队列名称 #队列名称
修改QUEUE_CONNECTION=rabbitmq

4.封装mq操作类

<?PHP
namespace App\Http\Service;
use PHPAmqpLib\Connection\AMQPStreamConnection;
use PHPAmqpLib\Message\AMQPMessage;
class RabbitmqService
{      //链接mq
    private static function getConnect()
    {
        $config = [
            'host' => env('RABBITMQ_HOST',];
        return new AMQPStreamConnection($config["host"],$config["port"],$config["user"],$config["password"],$config["vhost"]);
    }

    /**
     * @param $queue
     * @param $messageBody
     * @param string $exchange
     * @throws \Exception
     * 推送
     */
    public static function push($queue,$messageBody,$exchange = 'router')
    {
        $connection = self::getConnect();
        $channel = $connection->channel();
        $channel->queue_declare($queue,false,true,false);        //创建交换机路由策 direct严格模式 工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
        $channel->exchange_declare($exchange,'direct',false);
        $channel->queue_bind($queue,$exchange); // 队列和交换器绑定
        $message = new AMQPMessage($messageBody,array('content_type' => 'text/plain','delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->basic_publish($message,$exchange); // 推送消息
        $channel->close();
        $connection->close();
    }

    /**
     * @param $queue
     * @param $callback
     * @param string $exchange
     * @throws \Exception
     * 消费
     */
    public static function pop($queue,$callback,$exchange = 'router')
    {
        $connection = self::getConnect();
        $channel = $connection->channel();
        $message = $channel->basic_get($queue); //取出消息
        $res = $callback($message->body);
        if($res){
            $channel->basic_ack($message->getDeliveryTag());//ack确认
        }
        $channel->close();
        $connection->close();
        return true;
    }
}

5.创建laravel  Job类

<?PHP
namespace App\Jobs;
use App\Http\Service\RabbitmqService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class UpdateProduct implements ShouldQueue
{
    use dispatchable,InteractsWithQueue,Queueable,SerializesModels;

    protected $productKey;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($data)
    {
        $this->delay = 10;//延时10秒看效果如果不延时可不写
        //推送修改到对列里
        $this->productKey = "L::product::info::".$data->id;
        Log::info('L'.$this->productKey);
        Log::info('L'.json_encode($data));
        RabbitmqService::push('update_queue',$data);
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //消费对列 update_queue队列名称
        RabbitmqService::pop('update_queue',function ($message){
            Log::info('L::product::info::------'.$this->productKey);
            Log::info('L::product::info::msg------'.json_encode($message));
            $product = //这里根据自己的业务执行相应的逻辑返回true 和 false
            if (!$product){
                return false;
            }
            return true;
        });

    }

    public function Failed(\Exception $exception)
    {
        print_r($exception->getMessage());
    }
}

6.业务调用任务类

  $this->dispatch(new UpdateProduct(传递对应的参数));

7.监听队列执行

PHP artisan queue:work

可使用第三方工具监听队列

https://www.xiaoshu168.com/php/288.html

安装mq案例

https://www.xiaoshu168.com/docker/364.html

如果PHP没有安装amqp扩展参考文章

https://www.xiaoshu168.com/docker/366.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐