0

0

高效管理Pyads通知回调与大规模数据处理

碧海醫心

碧海醫心

发布时间:2025-08-11 18:00:20

|

801人浏览过

|

来源于php中文网

原创

高效管理Pyads通知回调与大规模数据处理

本文旨在探讨在Pyads应用中,如何高效处理来自PLC的ADS通知回调数据,特别是在处理大量、高频数据时,避免使用全局变量,并采用Pythonic的类封装方法管理状态。同时,文章还将深入讲解如何优化字节数据到Python类型的转换,以提升大规模数据处理的性能。

Pyads通知机制与数据处理挑战

pyads库提供了一种通过ads通知(notification)机制从倍福plc高效获取数据的方式。与轮询不同,通知允许plc在数据发生变化时主动向python应用程序发送更新,这对于需要快速响应(如200ms周期)和处理大量数据(如每周期1000个值,累积形成100000个值的数据序列)的场景至关重要。

然而,在处理通知回调时,开发者常面临一个挑战:如何将回调函数内部接收到的数据安全、高效地传递给主程序或其他处理逻辑,同时避免使用不推荐的全局变量。特别是当需要将多次回调的数据累积成一个大型数组时,这个问题变得尤为突出。Pyads的回调函数是独立的,直接在函数内部处理所有逻辑或依赖全局变量会使代码难以维护和扩展。

解决方案一:基于类的封装管理回调与状态

解决上述问题的核心在于利用Python的面向对象特性,将Pyads连接、通知设置以及数据处理逻辑封装到一个类中。这种方法不仅能够避免全局变量的使用,还能更好地管理应用程序的状态和数据流。

1. 类结构设计

通过将Pyads连接作为类的一个成员变量,并在类的初始化方法中设置通知,可以将回调函数定义为类的成员方法。这样,回调函数就可以直接访问和修改类的其他成员变量,从而实现数据的累积和状态的更新。

import pyads
import ctypes
import numpy as np # 提前引入,用于后续数据优化

class PlcDataManager:
    def __init__(self, ams_net_id, plc_ip, ams_port):
        """
        初始化PLC连接和数据管理器。
        """
        self.plc = pyads.Connection(ams_net_id, plc_ip, ams_port)
        self.plc.open()
        self.data_buffer = []  # 用于累积接收到的数据
        self.notification_handles = {} # 存储通知句柄,便于后续释放
        print(f"PLC connection established: {plc_ip}")

        # 示例:初始化心跳计数器
        self.heartbeat_count = 0
        self.plc_status_message = "PLC is running"

        self.setup_notifications()

    def setup_notifications(self):
        """
        设置ADS通知。
        """
        # 示例1:PLC心跳通知
        heartbeat_var_name = 'global.heartbeat' # 假设PLC中有一个心跳变量
        heartbeat_attr = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_BOOL))
        handle_heartbeat = self.plc.add_device_notification(
            heartbeat_var_name,
            heartbeat_attr,
            self.on_plc_heartbeat # 回调方法
        )
        self.notification_handles[heartbeat_var_name] = handle_heartbeat
        print(f"Notification set for {heartbeat_var_name}, handle: {handle_heartbeat}")

        # 示例2:PLC状态变化通知 (ADSSTATE_RUN, ADSSTATE_CONFIG等)
        # ADSIGRP_DEVICE_DATA (0xF100), ADSIOFFS_DEVDATA_ADSSTATE (0x0000) 是特殊的地址用于获取PLC状态
        plc_status_addr = (int("0xF100", 16), int("0x0000", 16))
        status_attr = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT))
        handle_status = self.plc.add_device_notification(
            plc_status_addr,
            status_attr,
            self.on_plc_status_change # 回调方法
        )
        self.notification_handles["PLC_Status"] = handle_status
        print(f"Notification set for PLC Status, handle: {handle_status}")

        # 示例3:自定义结构体数据通知 (原始问题中的场景)
        # 假设PLC中有一个名为 'global.sample_structure' 的变量
        structure_def = (
            ('nVar', pyads.PLCTYPE_DINT, 1000),
            ('nVar2', pyads.PLCTYPE_DINT, 1000),
            ('nVar3', pyads.PLCTYPE_DINT, 1000),
            ('nVar4', pyads.PLCTYPE_DINT, 1000),
            ('nVar5', pyads.PLCTYPE_DINT, 1000)
        )
        size_of_struct = pyads.size_of_structure(structure_def)
        data_attr = pyads.NotificationAttrib(size_of_struct)
        handle_data = self.plc.add_device_notification(
            'global.sample_structure',
            data_attr,
            self.on_data_received # 回调方法
        )
        self.notification_handles['global.sample_structure'] = handle_data
        print(f"Notification set for global.sample_structure, handle: {handle_data}")
        self.structure_definition = structure_def # 保存结构体定义供回调使用

    def on_plc_heartbeat(self, handle, name, timestamp, value):
        """
        PLC心跳通知回调。
        """
        # value 默认是 ctypes.c_ubyte * size 的字节数组,对于简单类型可以直接判断
        # 或者使用 pyads.dict_from_bytes(value, [(name, pyads.PLCTYPE_BOOL)])
        # 这里假设value直接是布尔值对应的字节
        is_active = bool(value[0]) # 假设PLCBOOL是单字节
        if is_active:
            self.heartbeat_count += 1
            # print(f"Heartbeat received: {self.heartbeat_count}")

    def on_plc_status_change(self, notification, _):
        """
        PLC状态变化通知回调。
        """
        # 使用parse_notification解析通知数据
        *_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)
        if value != pyads.ADSSTATE_RUN:
            self.plc_status_message = f"PLC exited run mode (State: {value})"
            print(self.plc_status_message)
        else:
            self.plc_status_message = "PLC is running"
            # print(self.plc_status_message)

    def on_data_received(self, handle, name, timestamp, value):
        """
        接收结构体数据通知回调。
        """
        # 将字节数据转换为字典
        values_dict = pyads.dict_from_bytes(value, self.structure_definition)
        # print(f"Received data: {values_dict['nVar'][0]}...") # 打印第一个元素示例

        # 将数据添加到缓冲区
        # 假设我们只关心 'nVar' 信号的数据,并将其展平
        self.data_buffer.extend(values_dict['nVar'])

        # 检查是否达到100000个值,并进行处理
        if len(self.data_buffer) >= 100000:
            print(f"Collected {len(self.data_buffer)} values. Processing...")
            # 在这里可以对 data_buffer 进行进一步处理,例如保存到文件、进行分析等
            processed_array = np.array(self.data_buffer[:100000]) # 取前100000个
            print(f"First 10 values of processed array: {processed_array[:10]}")
            self.data_buffer = self.data_buffer[100000:] # 清除已处理的数据,保留剩余部分

    def close(self):
        """
        关闭PLC连接并移除所有通知。
        """
        for var_name, handle in self.notification_handles.items():
            try:
                self.plc.del_device_notification(handle)
                print(f"Removed notification for {var_name}, handle: {handle}")
            except Exception as e:
                print(f"Error removing notification for {var_name}: {e}")
        self.plc.close()
        print("PLC connection closed.")

# 示例使用
if __name__ == "__main__":
    # 请替换为你的PLC实际连接参数
    AMS_NET_ID = '192.168.1.100.1.1' # PLC的AMS Net ID
    PLC_IP = '192.168.1.100'        # PLC的IP地址
    AMS_PORT = 851                 # 通常为851

    data_manager = PlcDataManager(AMS_NET_ID, PLC_IP, AMS_PORT)

    try:
        # 在主循环中等待数据或执行其他任务
        # Pyads通知通常在后台线程中运行,因此主线程可以保持活跃
        while True:
            # 可以在这里执行其他非阻塞任务
            # 例如:检查数据缓冲区大小,或者每隔一段时间打印心跳计数
            # print(f"Current heartbeat: {data_manager.heartbeat_count}, PLC Status: {data_manager.plc_status_message}")
            # print(f"Data buffer size: {len(data_manager.data_buffer)}")
            # time.sleep(1) # 模拟主程序忙碌
            pass # 实际应用中会在这里有更复杂的逻辑

    except KeyboardInterrupt:
        print("\nExiting program.")
    finally:
        data_manager.close()

2. 关键点:

  • 封装性 PlcDataManager 类封装了PLC连接、通知设置、数据缓冲区和所有相关的回调方法。
  • 状态管理: self.data_buffer、self.heartbeat_count 等成员变量用于存储和更新应用程序的状态,回调方法直接操作这些变量。
  • 资源管理: 在 __init__ 中打开连接并设置通知,在 close 方法中关闭连接并移除所有通知,确保资源正确释放。
  • 回调签名: Pyads通知回调的签名通常为 (handle, name, timestamp, value)。对于特殊的系统通知(如PLC状态),parse_notification 方法可以更方便地提取值。

解决方案二:优化大规模数据转换性能

当处理的PLC数据量巨大且类型一致时,pyads.dict_from_bytes 这样的逐变量转换方法可能会成为性能瓶颈。Pyads允许用户获取原始的 ctypes 字节数据,然后自行进行更高效的转换,例如利用 numpy 库。

1. return_ctypes=True 的应用

在 add_device_notification 或 read_by_name 方法中,可以通过设置 return_ctypes=True 来获取原始的 ctypes 对象,而不是直接解析为Python字典或基本类型。

万知
万知

万知: 你的个人AI工作站

下载
# 假设 structure_def 和 size_of_struct 已定义
# 修改 add_device_notification 设置,让回调接收原始ctypes数据
handle_data_optimized = self.plc.add_device_notification(
    'global.sample_structure',
    data_attr, # 仍然使用相同的NotificationAttrib
    self.on_data_received_optimized,
    return_ctypes=True # 关键在于这里
)
self.notification_handles['global.sample_structure_optimized'] = handle_data_optimized

# 新的回调方法,处理原始ctypes数据
def on_data_received_optimized(self, handle, name, timestamp, value_ctypes):
    """
    接收原始ctypes数据通知回调,并使用numpy优化转换。
    value_ctypes 是一个 ctypes 数组对象 (e.g., ctypes.c_ubyte * size_of_struct)
    """
    # 假设我们知道数据结构是连续的PLCTYPE_DINT数组
    # 并且我们只关心第一个变量 'nVar',它是一个1000个DINT的数组
    # 获取原始字节数据
    buffer = bytearray(value_ctypes)

    # 计算 'nVar' 在结构体中的偏移和大小
    # 这需要根据 structure_def 手动计算或预先确定
    # 假设 nVar 是结构体的第一个元素,且每个DINT是4字节
    num_elements = 1000
    element_size = ctypes.sizeof(pyads.PLCTYPE_DINT) # 4 bytes for DINT
    nvar_byte_length = num_elements * element_size

    # 使用numpy从字节缓冲区直接创建数组
    # dtype='= 100000:
            print(f"Collected {len(self.data_buffer)} values (optimized). Processing...")
            processed_array = np.array(self.data_buffer[:100000])
            print(f"First 10 values of processed array (optimized): {processed_array[:10]}")
            self.data_buffer = self.data_buffer[100000:]
    except ValueError as e:
        print(f"Error converting data with numpy: {e}")
        # Fallback to slower method or log error
        values_dict = pyads.dict_from_bytes(bytearray(value_ctypes), self.structure_definition)
        self.data_buffer.extend(values_dict['nVar'])

2. numpy 进行数据转换

numpy.frombuffer() 函数能够从一个缓冲区(如 bytearray 或 memoryview)直接解析数据,并以指定的数据类型(dtype)和字节序(endianness)创建 numpy 数组。对于连续存储的同类型数据,这种方法比逐个元素解析快几个数量级。

注意事项:

  • 数据结构理解: 使用 numpy.frombuffer() 需要对PLC中变量的内存布局有清晰的理解,包括每个变量的类型、大小以及在结构体中的偏移量。
  • 字节序(Endianness): PLC通常使用小端字节序(Little-endian),因此在 dtype 中使用
  • 数据类型映射: 确保 numpy 的 dtype 与PLC中的数据类型正确匹配。

总结与最佳实践

通过将Pyads通知回调和数据处理逻辑封装到类中,可以有效地管理应用程序的状态,避免全局变量的混乱,并提高代码的可读性和可维护性。对于需要处理大量、高频PLC数据的场景,进一步利用 pyads 的 return_ctypes=True 选项结合 numpy 进行数据转换,能够显著提升数据处理性能。

其他最佳实践:

  • 错误处理: 在回调函数和主程序中都应包含健壮的错误处理机制,以应对PLC通信中断、数据解析失败等情况。
  • 线程安全: Pyads通知回调通常在单独的线程中执行。如果回调函数需要修改主线程访问的数据,或与其他线程共享资源,务必考虑线程安全(例如使用 threading.Lock)。
  • 通知句柄管理: 妥善存储和管理 add_device_notification 返回的句柄,确保在程序退出时能够正确地调用 del_device_notification 移除通知,释放PLC资源。
  • 日志记录: 详细的日志记录有助于调试和监控应用程序的运行状态,特别是在生产环境中。

遵循这些原则,可以构建出高效、稳定且易于维护的Pyads应用程序,有效应对复杂的工业自动化数据采集需求。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

309

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

222

2025.10.31

go语言 面向对象
go语言 面向对象

本专题整合了go语言面向对象相关内容,阅读专题下面的文章了解更多详细内容。

56

2025.09.05

java面向对象
java面向对象

本专题整合了java面向对象相关内容,阅读专题下面的文章了解更多详细内容。

52

2025.11.27

全局变量怎么定义
全局变量怎么定义

本专题整合了全局变量相关内容,阅读专题下面的文章了解更多详细内容。

81

2025.09.18

python 全局变量
python 全局变量

本专题整合了python中全局变量定义相关教程,阅读专题下面的文章了解更多详细内容。

96

2025.09.18

golang结构体相关大全
golang结构体相关大全

本专题整合了golang结构体相关大全,想了解更多内容,请阅读专题下面的文章。

240

2025.06.09

golang结构体方法
golang结构体方法

本专题整合了golang结构体相关内容,请阅读专题下面的文章了解更多。

192

2025.07.04

C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

8

2026.01.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.7万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号