0

0

Swoole如何实现一个简单的MQTT服务器

裘德小鎮的故事

裘德小鎮的故事

发布时间:2025-09-30 21:30:04

|

750人浏览过

|

来源于php中文网

原创

Swoole可通过TCP服务器实现MQTT协议解析,核心包括处理CONNECT、PUBLISH、SUBSCRIBE等报文,管理客户端订阅关系与消息转发,需手动解析变长头部与主题长度,支持PINGREQ心跳与连接状态维护,配合mosquitto工具测试基础通信,适用于轻量级物联网场景,但生产环境需扩展QoS、TLS、持久化等机制。

swoole如何实现一个简单的mqtt服务器

Swoole 是一个强大的 PHP 扩展,支持异步、并发和长连接网络编程,非常适合用来构建自定义协议的服务器,比如 MQTT 服务。虽然 Swoole 不直接内置 MQTT 协议解析,但你可以基于其 TCP 或 WebSocket 服务器功能,手动实现一个轻量级的 MQTT 服务器。

理解 MQTT 协议基础

MQTT 是一种基于发布/订阅模式的轻量级消息协议,常用于物联网场景。它使用二进制报文格式,通过 TCP 传输。关键报文类型包括:CONNECT、PUBLISH、SUBSCRIBE、PINGREQ、DISCONNECT 等。

要实现简单 MQTT 服务器,你至少需要:

  • 解析 MQTT 固定头部(1字节控制类型+标志 + 变长长度)
  • 解析 CONNECT 报文中的客户端 ID、用户名、密码等
  • 支持客户端订阅主题(SUBSCRIBE)
  • 支持消息发布(PUBLISH)并转发给订阅者
  • 维护客户端连接状态和订阅关系

使用 Swoole 创建 TCP 服务器

下面是一个基于 Swoole 实现的极简 MQTT 服务器骨架:

$server = new Swoole\Server('0.0.0.0', 1883);

// 存储客户端订阅的主题,格式:topic => [fd1, fd2, ...]
$subscribers = [];

$server->on('connect', function ($serv, $fd) {
    echo "Client: {$fd} connected.\n";
});

$server->on('receive', function ($serv, $fd, $reactorId, $data) use (&$subscribers) {
    if (strlen($data) < 2) return;

    $firstByte = ord($data[0]);
    $msgType = ($firstByte & 0xF0) >> 4;
    // 解析剩余长度(MQTT 变长编码)
    $i = 1;
    $remainingLength = 0;
    $multiplier = 1;
    do {
        if ($i >= strlen($data)) return;
        $byte = ord($data[$i]);
        $remainingLength += ($byte & 127) * $multiplier;
        $multiplier *= 128;
        $i++;
    } while (($byte & 128) > 0);

    $payload = substr($data, $i, $remainingLength);

    switch ($msgType) {
        case 1: // CONNECT
            handleConnect($serv, $fd, $payload);
            break;
        case 3: // PUBLISH
            handlePublish($serv, $payload, $subscribers);
            break;
        case 8: // SUBSCRIBE
            handleSubscribe($serv, $fd, $payload, $subscribers);
            break;
        case 12: // PINGREQ
            $serv->send($fd, chr(0xC0) . chr(0x00)); // 返回 PINGRESP
            break;
        case 14: // DISCONNECT
            $serv->close($fd);
            break;
    }
});

$server->on('close', function ($serv, $fd) use (&$subscribers) {
    // 清理该客户端的订阅
    foreach ($subscribers as $topic => $clients) {
        $subscribers[$topic] = array_filter($clients, function ($clientFd) use ($fd) {
            return $clientFd !== $fd;
        });
    }
    echo "Client: {$fd} closed.\n";
});

$server->start();

function handleConnect($serv, $fd, $payload) {
    // 跳过协议名(通常是'MQTT')
    $offset = 0;
    $protocolNameLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2 + $protocolNameLen;

    $protocolLevel = $payload[$offset];
    $offset += 1;

    $connectFlags = ord($payload[$offset]);
    $offset += 1;

    $keepAlive = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;

    // 解析 Client ID
    $clientIdLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;
    $clientId = substr($payload, $offset, $clientIdLen);
    $offset += $clientIdLen;

    echo "Client ID: {$clientId} connected.\n";

    // 发送 CONNACK (连接确认)
    $connack = chr(0x20) . chr(0x02) . chr(0x00) . chr(0x00); // 0x00 表示连接成功
    $serv->send($fd, $connack);
}

function handleSubscribe($serv, $fd, $payload, &$subscribers) {
    $packetId = (ord($payload[0]) << 8) | ord($payload[1]);

    $offset = 2;
    $topics = [];
    while ($offset < strlen($payload)) {
        $topicLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
        $offset += 2;
        $topic = substr($payload, $offset, $topicLen);
        $offset += $topicLen;
        $qos = $payload[$offset] & 0x03;
        $offset += 1;

        $topics[] = $topic;
        if (!isset($subscribers[$topic])) {
            $subscribers[$topic] = [];
        }
        $subscribers[$topic][] = $fd;
    }

    // 返回 SUBACK
    $suback = chr(0x90) . chr(count($topics) + 2) . chr($packetId >> 8) . chr($packetId & 0xFF);
    foreach ($topics as $t) {
        $suback .= chr(0x01); // 返回 QoS 1
    }
    $serv->send($fd, $suback);
}

function handlePublish($serv, $payload, &$subscribers) {
    $offset = 0;
    $topicLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;
    $topic = substr($payload, $offset, $topicLen);
    $offset += $topicLen;

    $message = substr($payload, $offset);

    // 查找订阅了该主题的客户端并发送消息
    if (isset($subscribers[$topic])) {
        $pubMsg = chr(0x30) . pack('C', 2 + $topicLen + strlen($message))
                . chr($topicLen >> 8) . chr($topicLen & 0xFF)
                . $topic . $message;

        foreach ($subscribers[$topic] as $clientFd) {
            $serv->send($clientFd, $pubMsg);
        }
    }
}

测试你的 MQTT 服务器

使用 mosquitto_pubmosquitto_sub 工具测试:

易想商务网
易想商务网

YxB2B商务网是易想网络旗下的门户型B2B行业网站系统,采用先进的标签技术和静态生成技术,通过网站后台管理轻松实现网站前台多种风格和会员网站多风格,让每一个只要懂得简单网页制作常识的网友,轻松制作出精美专业的的行业商务网站系统。系统高速、稳定、安全,完全仿阿里巴巴功能设计,有供应信息、求购信息、产品库、公司库、专项商机、行业信息、展会服务、人才市场、会员助手、网商博客、商友论坛、全方位搜索等栏目

下载
  • mosquitto_sub -h 127.0.0.1 -p 1883 -t 'test/topic'
  • mosquitto_pub -h 127.0.0.1 -p 1883 -t 'test/topic' -m 'Hello Swoole MQTT'

如果一切正常,订阅端会收到消息。

注意事项与扩展建议

这个实现非常基础,仅用于学习。生产环境需考虑:

  • 完整的 MQTT 协议解析(QoS 0/1/2、遗嘱消息、Clean Session 等)
  • 心跳机制和超时断开
  • 主题通配符(+ 和 #)支持
  • 持久化会话和消息队列
  • 安全性:认证、加密(TLS)
  • 集群和负载均衡

也可以考虑基于开源项目如 EMQX 或使用 Swoole 配合 Workerman + GatewayWorker 构建更复杂的 MQTT 代理。

基本上就这些。用 Swoole 写 MQTT 服务器核心是解析二进制协议并管理连接状态,难点在协议细节处理。

相关专题

更多
php文件怎么打开
php文件怎么打开

打开php文件步骤:1、选择文本编辑器;2、在选择的文本编辑器中,创建一个新的文件,并将其保存为.php文件;3、在创建的PHP文件中,编写PHP代码;4、要在本地计算机上运行PHP文件,需要设置一个服务器环境;5、安装服务器环境后,需要将PHP文件放入服务器目录中;6、一旦将PHP文件放入服务器目录中,就可以通过浏览器来运行它。

2546

2023.09.01

php怎么取出数组的前几个元素
php怎么取出数组的前几个元素

取出php数组的前几个元素的方法有使用array_slice()函数、使用array_splice()函数、使用循环遍历、使用array_slice()函数和array_values()函数等。本专题为大家提供php数组相关的文章、下载、课程内容,供大家免费下载体验。

1612

2023.10.11

php反序列化失败怎么办
php反序列化失败怎么办

php反序列化失败的解决办法检查序列化数据。检查类定义、检查错误日志、更新PHP版本和应用安全措施等。本专题为大家提供php反序列化相关的文章、下载、课程内容,供大家免费下载体验。

1501

2023.10.11

php怎么连接mssql数据库
php怎么连接mssql数据库

连接方法:1、通过mssql_系列函数;2、通过sqlsrv_系列函数;3、通过odbc方式连接;4、通过PDO方式;5、通过COM方式连接。想了解php怎么连接mssql数据库的详细内容,可以访问下面的文章。

952

2023.10.23

php连接mssql数据库的方法
php连接mssql数据库的方法

php连接mssql数据库的方法有使用PHP的MSSQL扩展、使用PDO等。想了解更多php连接mssql数据库相关内容,可以阅读本专题下面的文章。

1417

2023.10.23

html怎么上传
html怎么上传

html通过使用HTML表单、JavaScript和PHP上传。更多关于html的问题详细请看本专题下面的文章。php中文网欢迎大家前来学习。

1234

2023.11.03

PHP出现乱码怎么解决
PHP出现乱码怎么解决

PHP出现乱码可以通过修改PHP文件头部的字符编码设置、检查PHP文件的编码格式、检查数据库连接设置和检查HTML页面的字符编码设置来解决。更多关于php乱码的问题详情请看本专题下面的文章。php中文网欢迎大家前来学习。

1446

2023.11.09

php文件怎么在手机上打开
php文件怎么在手机上打开

php文件在手机上打开需要在手机上搭建一个能够运行php的服务器环境,并将php文件上传到服务器上。再在手机上的浏览器中输入服务器的IP地址或域名,加上php文件的路径,即可打开php文件并查看其内容。更多关于php相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

1306

2023.11.13

C++ 单元测试与代码质量保障
C++ 单元测试与代码质量保障

本专题系统讲解 C++ 在单元测试与代码质量保障方面的实战方法,包括测试驱动开发理念、Google Test/Google Mock 的使用、测试用例设计、边界条件验证、持续集成中的自动化测试流程,以及常见代码质量问题的发现与修复。通过工程化示例,帮助开发者建立 可测试、可维护、高质量的 C++ 项目体系。

3

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
PHP课程
PHP课程

共137课时 | 8.7万人学习

JavaScript ES5基础线上课程教学
JavaScript ES5基础线上课程教学

共6课时 | 7万人学习

PHP新手语法线上课程教学
PHP新手语法线上课程教学

共13课时 | 0.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号