0

0

在Laravel WebSockets中实现连接生命周期管理与业务逻辑绑定

心靈之曲

心靈之曲

发布时间:2025-12-01 09:15:16

|

239人浏览过

|

来源于php中文网

原创

在Laravel WebSockets中实现连接生命周期管理与业务逻辑绑定

本教程将指导您如何在laravel websockets中定制连接的生命周期事件,包括连接的打开与关闭。通过扩展默认的websocket处理器,我们将演示如何捕获并关联客户端的业务上下文(如用户id、订单id),从而在连接断开时执行特定的业务逻辑,例如自动解锁正在处理的订单,实现对应用资源的精确状态管理。

引言:定制WebSocket连接的必要性

在实时应用开发中,WebSocket连接不仅仅是数据传输的通道,它更代表了客户端与服务器之间的一种持续性会话。很多业务场景需要我们能够感知并响应这些会话的生命周期事件,例如:

  • 资源锁定与解锁:当用户打开一个订单页面进行编辑时,锁定该订单以防止其他用户同时修改;当用户关闭页面或断开连接时,自动解锁订单。
  • 用户在线状态:实时显示用户的在线或离线状态。
  • 协作编辑:跟踪文档编辑者的连接状态,实现实时协作。

Laravel WebSockets 包(Beyondcode 的 Pusher 替代方案)提供了强大的功能,但其默认处理器可能无法满足所有复杂的业务需求。为了实现上述场景,我们需要扩展其核心处理器,介入连接的打开、关闭及消息处理过程,并注入自定义的业务逻辑。

理解WebSocket处理器

Laravel WebSockets 的核心是 BeyondCode\LaravelWebSockets\WebSockets\WebSocketHandler 接口,它定义了处理WebSocket连接生命周期的方法:

  • onOpen(ConnectionInterface $connection, RequestInterface $request, $appId): 当新的WebSocket连接建立时调用。
  • onClose(ConnectionInterface $connection): 当WebSocket连接关闭时调用。
  • onMessage(ConnectionInterface $connection, MessageInterface $msg): 当收到来自客户端的消息时调用。
  • onError(ConnectionInterface $connection, \Exception $e): 当连接发生错误时调用。
  • onPong(ConnectionInterface $connection, MessageInterface $msg): 当收到客户端的 Pong 消息时调用。

通常,我们不是直接实现 WebSocketHandler 接口,而是继承 BeyondCode\LaravelWebSockets\WebSockets\PusherHandler。PusherHandler 已经实现了 Pusher 协议的诸多细节,我们可以在此基础上重写或增强特定方法,以集成我们的业务逻辑。

创建自定义WebSocket处理器

为了定制连接行为,我们首先需要创建一个自定义的处理器类。我们将使用 SplObjectStorage 来存储与每个连接关联的业务上下文数据,因为 ConnectionInterface 对象是唯一的且可以作为 SplObjectStorage 的键。

首先,在 app/WebSockets 目录下创建 CustomWebSocketHandler.php 文件:

// app/WebSockets/CustomWebSocketHandler.php
<?php

namespace App\WebSockets;

use BeyondCode\LaravelWebSockets\WebSockets\PusherHandler;
use Ratchet\ConnectionInterface;
use Illuminate\Support\Facades\Log;
use SplObjectStorage; // 引入 SplObjectStorage

class CustomWebSocketHandler extends PusherHandler
{
    /**
     * @var SplObjectStorage 存储连接与业务上下文的映射
     */
    protected SplObjectStorage $connections;

    public function __construct()
    {
        parent::__construct();
        $this->connections = new SplObjectStorage();
    }

    /**
     * 当新的WebSocket连接建立时调用。
     *
     * @param ConnectionInterface $connection
     * @param \Psr\Http\Message\RequestInterface $request
     * @param string $appId
     * @return void
     */
    public function onOpen(ConnectionInterface $connection, \Psr\Http\Message\RequestInterface $request, $appId)
    {
        // 调用父类的onOpen方法,确保Pusher协议的正常初始化
        parent::onOpen($connection, $request, $appId);

        Log::info("Connection opened: {$connection->resourceId}");

        // 尝试从请求中获取业务上下文,例如用户ID或订单ID
        // 客户端可以通过WebSocket URL的查询参数传递这些信息
        $queryParams = $request->getQueryParams();
        $userId = $queryParams['user_id'] ?? null;
        $orderId = $queryParams['order_id'] ?? null;

        // 存储连接与业务上下文
        $this->connections->attach($connection, [
            'resource_id' => $connection->resourceId,
            'user_id' => $userId,
            'order_id' => $orderId,
            'connected_at' => now(),
            'channels' => [], // 用于存储该连接订阅的频道
        ]);

        if ($orderId) {
            Log::info("Order {$orderId} is now being processed by user {$userId} via connection {$connection->resourceId}");
            // 触发事件以锁定订单
            event(new \App\Events\OrderLocked($orderId, $userId, $connection->resourceId));
        }
    }

    /**
     * 当收到客户端消息时调用。
     *
     * @param ConnectionInterface $connection
     * @param \Ratchet\MessageComponent\MessageInterface $msg
     * @return void
     */
    public function onMessage(ConnectionInterface $connection, \Ratchet\MessageComponent\MessageInterface $msg)
    {
        parent::onMessage($connection, $msg);

        $payload = json_decode($msg->getPayload());

        // 进一步处理消息,例如当客户端订阅特定频道时更新上下文
        if (isset($payload->event) && $payload->event === 'pusher:subscribe' && isset($payload->data->channel)) {
            $channelName = $payload->data->channel;
            $context = $this->connections->offsetGet($connection);
            $context['channels'][] = $channelName;
            $this->connections->offsetSet($connection, $context); // 更新存储的上下文

            Log::info("Connection {$connection->resourceId} subscribed to channel: {$channelName}");

            // 如果频道名包含订单ID,可以进一步提取并更新
            if (preg_match('/^private-order\.(\d+)$/', $channelName, $matches)) {
                $orderId = $matches[1];
                if ($context['order_id'] !== $orderId) {
                    Log::warning("Connection {$connection->resourceId} subscribed to order {$orderId}, but initial order was {$context['order_id']}");
                    // 可以在这里更新或处理冲突
                }
            }
        }
    }

    /**
     * 当WebSocket连接关闭时调用。
     *
     * @param ConnectionInterface $connection
     * @return void
     */
    public function onClose(ConnectionInterface $connection)
    {
        Log::info("Connection closed: {$connection->resourceId}");

        // 确保该连接存在于我们的存储中
        if ($this->connections->contains($connection)) {
            $context = $this->connections->offsetGet($connection);

            $userId = $context['user_id'];
            $orderId = $context['order_id'];

            if ($orderId) {
                Log::info("Order {$orderId} is no longer processed by user {$userId} via connection {$connection->resourceId}");
                // 触发事件以解锁订单
                event(new \App\Events\OrderUnlocked($orderId, $userId, $connection->resourceId));
            }

            // 清理连接上下文
            $this->connections->detach($connection);
        }

        // 调用父类的onClose方法
        parent::onClose($connection);
    }

    /**
     * 当连接发生错误时调用。
     *
     * @param ConnectionInterface $connection
     * @param \Exception $e
     * @return void
     */
    public function onError(ConnectionInterface $connection, \Exception $e)
    {
        Log::error("Connection error for {$connection->resourceId}: " . $e->getMessage());
        parent::onError($connection, $e);
    }
}

代码说明:

  1. SplObjectStorage $connections: 这是关键,用于存储每个 ConnectionInterface 对象及其关联的业务数据。
  2. onOpen 方法:
    • 在调用 parent::onOpen 之后,我们从 RequestInterface $request 的查询参数中尝试提取 user_id 和 order_id。
    • 将这些信息与 connection 对象一起存储到 $this->connections 中。
    • 如果成功获取到 orderId,则触发一个 OrderLocked 事件,通知应用层锁定该订单。
  3. onMessage 方法 (可选但推荐):
    • 此方法用于处理客户端发送的所有消息。在这里,我们特别关注 pusher:subscribe 事件。
    • 当客户端订阅一个频道时,我们可以解析频道名称(例如 private-order.123),从中提取更具体的业务ID,并更新 SplObjectStorage 中该连接的上下文信息。这在初始 onOpen 无法获得所有上下文时非常有用。
  4. onClose 方法:
    • 在连接关闭时,我们通过 ConnectionInterface $connection 从 $this->connections 中检索之前存储的业务上下文。
    • 根据上下文中的 order_id,触发一个 OrderUnlocked 事件,通知应用层解锁订单。
    • 最后,从 $this->connections 中移除该连接的上下文,防止内存泄漏。
  5. onError 方法: 记录错误信息,以便调试。

定义业务事件

为了解耦 WebSocket 处理器与具体的业务逻辑,我们推荐使用 Laravel 事件。

ModelGate
ModelGate

一站式AI模型管理与调用工具

下载

OrderLocked 事件:

// app/Events/OrderLocked.php
<?php

namespace App\Events;

use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class OrderLocked
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public $orderId;
    public $userId;
    public $connectionId;

    public function __construct($orderId, $userId, $connectionId)
    {
        $this->orderId = $orderId;
        $this->userId = $userId;
        $this->connectionId = $connectionId;
    }
}

OrderUnlocked 事件:

// app/Events/OrderUnlocked.php
<?php

namespace App\Events;

use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class OrderUnlocked
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public $orderId;
    public $userId;
    public $connectionId;

    public function __construct($orderId, $userId, $connectionId)
    {
        $this->orderId = $orderId;
        $this->userId = $userId;
        $this->connectionId = $connectionId;
    }
}

然后,您可以在 app/Listeners 中创建相应的监听器来处理这些事件,例如更新数据库中的订单状态。

注册自定义处理器

最后一步是告诉 Laravel WebSockets 使用您的自定义处理器。修改 config/websockets.php 文件:

// config/websockets.php

return [
    // ... 其他配置

    'handler' => \App\WebSockets\CustomWebSocketHandler::class,

    // ... 其他配置
];

客户端实现

为了让 onOpen 方法能够获取到 user_id 和 order_id,客户端在建立 WebSocket 连接时需要将这些信息作为查询参数传递。

使用 Laravel Echo 和 JavaScript:

import Echo from 'laravel-echo';

window.Pusher = require('pusher-js');

// 假设您在后端视图中将这些ID传递给前端
const currentUserId = @json(auth()->id());
const currentOrderId = @json($order->id ?? null); // 如果在订单页面

window.Echo = new Echo({
    broadcaster: 'pusher',
    key: process.env.MIX_PUSHER_APP_KEY,
    wsHost: window.location.hostname,
    wsPort: 6001

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
laravel组件介绍
laravel组件介绍

laravel 提供了丰富的组件,包括身份验证、模板引擎、缓存、命令行工具、数据库交互、对象关系映射器、事件处理、文件操作、电子邮件发送、队列管理和数据验证。想了解更多laravel的相关内容,可以阅读本专题下面的文章。

340

2024.04.09

laravel中间件介绍
laravel中间件介绍

laravel 中间件分为五种类型:全局、路由、组、终止和自定。想了解更多laravel中间件的相关内容,可以阅读本专题下面的文章。

293

2024.04.09

laravel使用的设计模式有哪些
laravel使用的设计模式有哪些

laravel使用的设计模式有:1、单例模式;2、工厂方法模式;3、建造者模式;4、适配器模式;5、装饰器模式;6、策略模式;7、观察者模式。想了解更多laravel的相关内容,可以阅读本专题下面的文章。

773

2024.04.09

thinkphp和laravel哪个简单
thinkphp和laravel哪个简单

对于初学者来说,laravel 的入门门槛较低,更易上手,原因包括:1. 更简单的安装和配置;2. 丰富的文档和社区支持;3. 简洁易懂的语法和 api;4. 平缓的学习曲线。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

385

2024.04.10

laravel入门教程
laravel入门教程

本专题整合了laravel入门教程,想了解更多详细内容,请阅读专题下面的文章。

141

2025.08.05

laravel实战教程
laravel实战教程

本专题整合了laravel实战教程,阅读专题下面的文章了解更多详细内容。

85

2025.08.05

laravel面试题
laravel面试题

本专题整合了laravel面试题相关内容,阅读专题下面的文章了解更多详细内容。

80

2025.08.05

PHP高性能API设计与Laravel服务架构实践
PHP高性能API设计与Laravel服务架构实践

本专题围绕 PHP 在现代 Web 后端开发中的高性能实践展开,重点讲解基于 Laravel 框架构建可扩展 API 服务的核心方法。内容涵盖路由与中间件机制、服务容器与依赖注入、接口版本管理、缓存策略设计以及队列异步处理方案。同时结合高并发场景,深入分析性能瓶颈定位与优化思路,帮助开发者构建稳定、高效、易维护的 PHP 后端服务体系。

568

2026.03.04

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

26

2026.03.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
PHP课程
PHP课程

共137课时 | 13.5万人学习

JavaScript ES5基础线上课程教学
JavaScript ES5基础线上课程教学

共6课时 | 11.3万人学习

PHP新手语法线上课程教学
PHP新手语法线上课程教学

共13课时 | 1.0万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号