0

0

教你使用mixphp打造多进程异步邮件发送

藏色散人

藏色散人

发布时间:2020-08-20 13:25:11

|

2858人浏览过

|

来源于segmentfault

转载

注意:这个是 mixphp v1 的范例

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

推荐:《PHP视频教程

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

  • 异步
  • 消息队列
  • 多进程
  • 守护进程

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

立即学习PHP免费学习笔记(深入)”;

  • redis
  • rabbitmq
  • kafka

本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);

架构设计

本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer

生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

kimi.ai
kimi.ai

Kimi.ai 是月之暗面(Moonshot AI)公司推出的AI智能聊天机器人,能进行智能闲聊、解答问题,提供生活AI助手服务等。

下载

消费者开发

本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二种进程:

  • 左进程:负责从消息队列取出任务数据,投放给中进程。

  • 中进程:负责执行邮件发送任务。

PushCommand.php 代码如下:

<?php

namespace apps\daemon\commands;

use mix\console\ExitCode;
use mix\facades\Input;
use mix\facades\Redis;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;

/**
 * 推送模式范例
 * @author 刘健 <coder.liu@qq.com>
 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = 'smtpdm.aliyun.com';
    const PORT = 465;
    const SECURITY = 'ssl';
    const USERNAME = '****@email.***.com';
    const PASSWORD = '****';

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 获取程序名称
        $this->programName = Input::getCommandName();
        // 设置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 获取服务
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 类路径
                'class'         => 'mix\task\TaskExecutor',
                // 服务名称
                'name'          => "mix-daemon: {$this->programName}",
                // 执行类型
                'type'          => \mix\task\TaskExecutor::TYPE_DAEMON,
                // 执行模式
                'mode'          => \mix\task\TaskExecutor::MODE_PUSH,
                // 左进程数
                'leftProcess'   => 1,
                // 中进程数
                'centerProcess' => 5,
                // 任务超时时间 (秒)
                'timeout'       => 5,
            ]
        );
    }

    // 启动
    public function actionStart()
    {
        // 预处理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 启动服务
        $service = $this->getTaskService();
        $service->on('LeftStart', [$this, 'onLeftStart']);
        $service->on('CenterStart', [$this, 'onCenterStart']);
        $service->start();
        // 返回退出码
        return ExitCode::OK;
    }

    // 左进程启动事件回调函数
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
            $queueModel = Redis::getInstance();
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 从消息队列中间件阻塞获取一条消息
                $data = $queueModel->brpop('queue:email', 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\Exception $e) {
            // 休息一会,避免 CPU 出现 100%
            sleep(1);
            // 抛出错误
            throw $e;
        }
    }

    // 中进程启动事件回调函数
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
        for ($j = 0; $j < 16000; $j++) {
            // 从进程消息队列中抢占一条消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 处理消息
            try {
                // 处理消息,比如:发送短信、发送邮件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\Exception $e) {
                // 回退数据到消息队列
                $worker->rollback($data);
                // 休息一会,避免 CPU 出现 100%
                sleep(1);
                // 抛出错误
                throw $e;
            }
        }
    }

    // 发送邮件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \Swift_Mailer($transport);
        // Create a message
        $message = (new \Swift_Message($data['subject']))
            ->setFrom([self::USERNAME => '**网'])
            ->setTo($data['to'])
            ->setBody($data['body']);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}

测试

1.在 shell 中启动 push 常驻程序。        
[root@localhost bin]# ./mix-daemon push start
mix-daemon 'push' start successed.
1.调用接口往消息队列投放任务。

此时 shell 终端将打印:

企业微信截图_15979008134242.png

成功收到测试邮件:

企业微信截图_15979008195228.png

MixPHP

GitHub: https://github.com/mix-php/mix
官网:http://www.mixphp.cn/

PHP速学教程(入门到精通)
PHP速学教程(入门到精通)

PHP怎么学习?PHP怎么入门?PHP在哪学?PHP怎么学才快?不用担心,这里为大家提供了PHP速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
composer是什么插件
composer是什么插件

Composer是一个PHP的依赖管理工具,它可以帮助开发者在PHP项目中管理和安装依赖的库文件。Composer通过一个中央化的存储库来管理所有的依赖库文件,这个存储库包含了各种可用的依赖库的信息和版本信息。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

161

2023.12.25

什么是中间件
什么是中间件

中间件是一种软件组件,充当不兼容组件之间的桥梁,提供额外服务,例如集成异构系统、提供常用服务、提高应用程序性能,以及简化应用程序开发。想了解更多中间件的相关内容,可以阅读本专题下面的文章。

181

2024.05.11

Golang 中间件开发与微服务架构
Golang 中间件开发与微服务架构

本专题系统讲解 Golang 在微服务架构中的中间件开发,包括日志处理、限流与熔断、认证与授权、服务监控、API 网关设计等常见中间件功能的实现。通过实战项目,帮助开发者理解如何使用 Go 编写高效、可扩展的中间件组件,并在微服务环境中进行灵活部署与管理。

224

2025.12.18

数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

312

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

223

2025.10.31

c语言 数据类型
c语言 数据类型

本专题整合了c语言数据类型相关内容,阅读专题下面的文章了解更多详细内容。

97

2026.02.12

硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1708

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

549

2025.10.17

Golang 测试体系与代码质量保障:工程级可靠性建设
Golang 测试体系与代码质量保障:工程级可靠性建设

Go语言测试体系与代码质量保障聚焦于构建工程级可靠性系统。本专题深入解析Go的测试工具链(如go test)、单元测试、集成测试及端到端测试实践,结合代码覆盖率分析、静态代码扫描(如go vet)和动态分析工具,建立全链路质量监控机制。通过自动化测试框架、持续集成(CI)流水线配置及代码审查规范,实现测试用例管理、缺陷追踪与质量门禁控制,确保代码健壮性与可维护性,为高可靠性工程系统提供质量保障。

24

2026.02.28

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号