0

0

详解PHP消息队列的实现以及运用(附流程图)

藏色散人

藏色散人

发布时间:2022-10-27 16:00:09

|

5655人浏览过

|

来源于learnku

转载

消息队列的概念、原理、实现方式

概念

  • 队列结构的一个中间件
  • 不需要立即消费消息
  • 由消费者或者订阅者进行按顺序消费

基本的流程图如下所示

  • 流程
    ca434f67a2b74693d8a8db235ff9d3d.jpg

应用场景

  • 冗余
  • 解耦
  • 流量削峰
  • 异步通信

实现方式

  • mysql:可靠、速度慢
  • redis:速度快,对于大消息包处理较慢
  • 消息系统:可靠、专业性强

消息的触发机制

  • 死循环的方式,故障时无法及时恢复
  • 定时任务:压力均分、但是处理量有上限
  • 守护进程的方式

解耦 (订单和配送系统)

  • 架构设计1 采用定时任务的方式
    1013f8bafa2312f0f60c2f8253eb0f2.jpg

  • 使用配送处理系统进行处理时,将当前数据库里需要处理的订单状态更新为2,待处理完成后将状态设为1

  • 可以每次指定更新多少条数据

流量削锋 (redis实现秒杀)

  • 使用队列的数据结构

    • lpush/rpush 将数据放入列表中
    • lpop/rpop 将数据移除列表并获取到移除的值
    • ltrim 保留指定区间内的元素
    • llen 获取列表长度
    • lset 通过索引设置列表的值
    • lindex 通过索引获取列表中的值
    • lrange 获取指定范围的元素
  • 图示如下
    03e20c9fdf5009f0ad7383988c42c6f.jpg

    立即学习PHP免费学习笔记(深入)”;

  • 代码流程如下

    • 秒杀程序将请求写入redis(uid,time)

      Audo Studio
      Audo Studio

      AI音频清洗工具(噪音消除、声音平衡、音量调节)

      下载
    • 检查redis列表存放的长度,超过10个直接舍弃

    • 通过死循环读取redis数据,并存入数据库

      // Spike.php 秒杀程序if(Redis::llen('lottery') < 10){
          // 成功
          Redis::lpush('lottery', $uid.'%'.microtime());}else{
          // 失败}
      // Warehousing.php 入库程序while(true){
          $user = Redis::rpop('lottery');
          if (!$user || $user == 'nil') {
              sleep(2);
              continue;
          }
          $user_arr = explode($user, '%');
          $insert_user = [
              'uid' => $user_arr[0],
              'time' => $user_arr[1]
          ];
          $res = DB::table('lottery_queue')->insert($insert_user);
          if (!$res) {
              Redis::lpush('lottery', $user);
          }}
  • 上述代码中假如并发过大的话会存在超卖的情况,此时可以使用文件锁或者redis分布式锁进行控制,先将商品放入redis list中 使用rpop进行取出,如果取不到则说明已经卖完

  • 具体的思路及伪代码如下

      // 先将商品放入redis中
      $goods_id = 2;
    
      $sql = select id,num from goods where id = $goods_id;
      $res = DB::select($sql);
      if (!empty($res)) {
          // 也可以指定多少件
          Redis::del('lottery_goods' . $goods_id);
          for($i=0;$i<$res['num'];$i++){
              Redis::lpush('lottery_goods . $goods_id', $i);
          }
          LOG::info('商品存入队列成功,数量:' . Redis::llen('lottery_goods . $goods_id'));
      } else {
          LOG::info($goods_id . '加入失败');
      }
      // 开始秒杀
      $count = Redis::rpop('lottery_goods' . $goods_id);
      if (!$count) {
          // 商品已抢完
          ...
      }
    
      // 用户抢购队列
      $user_list = 'user_goods_id_' . $goods_id;
      $user_status = Redis::sismember($user_list, $user_id);
      if ($user_status) {
          // 已抢过
          ...
      }
    
      // 将抢到的放到列表中
      Redis::sadd($user_list, $uid);
      $msg = '用户:' . $uid . '顺序' . $count;
      Log::info($msg);
      // 生成订单等
      ...
      // 减库存
      $sql = update goods set num = num -1 where id = $goods_id and num > 0; // 防止超卖
      DB::update($sql)
      // 抢购成功

rabbitmq

  • 架构及原理
    1fefd7af11b134a7ce893e66a6bd505.jpg
    其中P代表生产者,X为交换机(channal),C代表消费者

  • 简单使用

      // Send.php
      require_once __DIR__.'/vendor/autoload.php';
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
      // 创建通道
      $channel = $connection->channel();
      // 声明一个队列
      $channel->queue_declare('user_email', false, false, false, false);
      // 制作消息
      $msg = new AMQPMessage('send email');
      // 将消息推送到队列
      $channel->basic_publish($msg, '', 'user_email');
    
      echo '[x] send email';
    
      $channel->close();
      $connection->close();
      // Receive.php
      require_once __DIR__.'/vendor/autoload.php';
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
      //创建通道
      $channel = $connection->channel();
    
      $channel->queue_declare('user_email', false, false, false, false);
    
      // 当收到消息时的回调函数
      $callback = function($msg){
          //发送邮件
          echo 'Received '.$msg->body.'\n';
      };
    
      $channel->basic_consume('user_email', '', false, true, false, false, $callback);
    
      // 保持监听状态
      while($channel->is_open()){
          $channel->wait();
      }

相关专题

更多
php文件怎么打开
php文件怎么打开

打开php文件步骤:1、选择文本编辑器;2、在选择的文本编辑器中,创建一个新的文件,并将其保存为.php文件;3、在创建的PHP文件中,编写PHP代码;4、要在本地计算机上运行PHP文件,需要设置一个服务器环境;5、安装服务器环境后,需要将PHP文件放入服务器目录中;6、一旦将PHP文件放入服务器目录中,就可以通过浏览器来运行它。

2787

2023.09.01

php怎么取出数组的前几个元素
php怎么取出数组的前几个元素

取出php数组的前几个元素的方法有使用array_slice()函数、使用array_splice()函数、使用循环遍历、使用array_slice()函数和array_values()函数等。本专题为大家提供php数组相关的文章、下载、课程内容,供大家免费下载体验。

1685

2023.10.11

php反序列化失败怎么办
php反序列化失败怎么办

php反序列化失败的解决办法检查序列化数据。检查类定义、检查错误日志、更新PHP版本和应用安全措施等。本专题为大家提供php反序列化相关的文章、下载、课程内容,供大家免费下载体验。

1546

2023.10.11

php怎么连接mssql数据库
php怎么连接mssql数据库

连接方法:1、通过mssql_系列函数;2、通过sqlsrv_系列函数;3、通过odbc方式连接;4、通过PDO方式;5、通过COM方式连接。想了解php怎么连接mssql数据库的详细内容,可以访问下面的文章。

1016

2023.10.23

php连接mssql数据库的方法
php连接mssql数据库的方法

php连接mssql数据库的方法有使用PHP的MSSQL扩展、使用PDO等。想了解更多php连接mssql数据库相关内容,可以阅读本专题下面的文章。

1464

2023.10.23

html怎么上传
html怎么上传

html通过使用HTML表单、JavaScript和PHP上传。更多关于html的问题详细请看本专题下面的文章。php中文网欢迎大家前来学习。

1255

2023.11.03

PHP出现乱码怎么解决
PHP出现乱码怎么解决

PHP出现乱码可以通过修改PHP文件头部的字符编码设置、检查PHP文件的编码格式、检查数据库连接设置和检查HTML页面的字符编码设置来解决。更多关于php乱码的问题详情请看本专题下面的文章。php中文网欢迎大家前来学习。

1569

2023.11.09

php文件怎么在手机上打开
php文件怎么在手机上打开

php文件在手机上打开需要在手机上搭建一个能够运行php的服务器环境,并将php文件上传到服务器上。再在手机上的浏览器中输入服务器的IP地址或域名,加上php文件的路径,即可打开php文件并查看其内容。更多关于php相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

1307

2023.11.13

菜鸟裹裹入口以及教程汇总
菜鸟裹裹入口以及教程汇总

本专题整合了菜鸟裹裹入口地址及教程分享,阅读专题下面的文章了解更多详细内容。

0

2026.01.22

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
PHP自制框架
PHP自制框架

共8课时 | 0.6万人学习

Swoft2.x速学之http api篇课程
Swoft2.x速学之http api篇课程

共16课时 | 0.9万人学习

PHP入门到实战消息队列RabbitMQ
PHP入门到实战消息队列RabbitMQ

共22课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号