0

0

并行处理视频流:使用 PySpark 进行大规模视频分析

DDD

DDD

发布时间:2025-08-19 23:08:04

|

575人浏览过

|

来源于php中文网

原创

并行处理视频流:使用 pyspark 进行大规模视频分析

本文档介绍了如何使用 PySpark 并行处理多个视频文件,并进行人脸识别等视频分析任务。我们将探讨如何利用 Spark 的分布式计算能力,高效地从视频中提取帧,检测人脸,并进行人脸追踪。本文提供了详细的代码示例和步骤,帮助读者理解和应用 PySpark 进行大规模视频处理。

环境配置

首先,确保你的环境中安装了必要的 Python 库和 FFmpeg。

  1. 安装 Python 库:

    pip install ffmpeg-python
    pip install face-recognition
    conda install -c conda-forge opencv
  2. 安装 FFmpeg:

    确保你的系统上安装了 FFmpeg。具体的安装方法取决于你的操作系统。例如,在 Ubuntu 上可以使用以下命令:

    sudo apt-get update
    sudo apt-get install ffmpeg

PySpark 代码实现

以下代码展示了如何使用 PySpark 并行读取视频文件,提取帧,并进行人脸检测和追踪。

from pyspark import SQLContext, SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 配置 Spark
conf = SparkConf().setAppName("myApp").setMaster("local[40]")
spark = SparkSession.builder.master("local[40]").config("spark.driver.memory", "30g").getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

import cv2
import os
import uuid
import ffmpeg
import subprocess
import numpy as np

from scipy.optimize import linear_sum_assignment
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.types import (StructType, StructField,
                               IntegerType, FloatType,
                               ArrayType, BinaryType,
                               MapType, DoubleType, StringType)

from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer
from pyspark.sql import Row, DataFrame, SparkSession

import pathlib

# 指定视频文件目录
input_dir = "../data/video_files/faces/"
pathlist = list(pathlib.Path(input_dir).glob('*.mp4'))
pathlist = [Row(str(ele)) for ele in pathlist]

# 创建 DataFrame
column_name = ["video_uri"]
df = sqlContext.createDataFrame(data=pathlist, schema=column_name)

print("Initial dataframe")
df.show(10, truncate=False)

# 定义视频元数据 Schema
video_metadata = StructType([
    StructField("width", IntegerType(), False),
    StructField("height", IntegerType(), False),
    StructField("num_frames", IntegerType(), False),
    StructField("duration", FloatType(), False)
])

# 定义 Shot Schema
shots_schema = ArrayType(
    StructType([
        StructField("start", FloatType(), False),
        StructField("end", FloatType(), False)
    ]))

# 定义 UDF:提取视频元数据
@F.udf(returnType=video_metadata)
def video_probe(uri):
    probe = ffmpeg.probe(uri, threads=1)
    video_stream = next(
        (
            stream
            for stream in probe["streams"]
            if stream["codec_type"] == "video"
        ),
        None,
    )
    width = int(video_stream["width"])
    height = int(video_stream["height"])
    num_frames = int(video_stream["nb_frames"])
    duration = float(video_stream["duration"])
    return (width, height, num_frames, duration)

# 定义 UDF:提取视频帧
@F.udf(returnType=ArrayType(BinaryType()))
def video2images(uri, width, height,
                 sample_rate: int = 5,
                 start: float = 0.0,
                 end: float = -1.0,
                 n_channels: int = 3):
    """
    Uses FFmpeg filters to extract image byte arrays
    and sampled & localized to a segment of video in time.
    """
    video_data, _ = (
        ffmpeg.input(uri, threads=1)
        .output(
            "pipe:",
            format="rawvideo",
            pix_fmt="rgb24",
            ss=start,
            t=end - start,
            r=1 / sample_rate,
        ).run(capture_stdout=True))
    img_size = height * width * n_channels
    return [video_data[idx:idx + img_size] for idx in range(0, len(video_data), img_size)]

# 添加视频元数据列
df = df.withColumn("metadata", video_probe(F.col("video_uri")))
print("With Metadata")
df.show(10, truncate=False)

# 提取视频帧
df = df.withColumn("frame", F.explode(
    video2images(F.col("video_uri"), F.col("metadata.width"), F.col("metadata.height"), F.lit(1), F.lit(0.0),
                 F.lit(5.0))))

# 定义人脸检测相关 Schema 和 UDF
box_struct = StructType(
    [
        StructField("xmin", IntegerType(), False),
        StructField("ymin", IntegerType(), False),
        StructField("xmax", IntegerType(), False),
        StructField("ymax", IntegerType(), False)
    ]
)

def bbox_helper(bbox):
    top, right, bottom, left = bbox
    bbox = [top, left, bottom, right]

    return list(map(lambda x: max(x, 0), bbox))

@F.udf(returnType=ArrayType(box_struct))
def face_detector(img_data, width=1920, height=1080, n_channels=3):
    img = np.frombuffer(img_data, np.uint8).reshape(height, width, n_channels)
    faces = face_recognition.face_locations(img)
    return [bbox_helper(f) for f in faces]

# 进行人脸检测
df = df.withColumn("faces", face_detector(F.col("frame"), F.col("metadata.width"), F.col("metadata.height")))

# 定义人脸追踪相关 Schema 和 UDF
annot_schema = ArrayType(
    StructType(
        [
            StructField("bbox", box_struct, False),
            StructField("tracker_id", StringType(), False),
        ]
    )
)

def bbox_iou(b1, b2):
    L = list(zip(b1, b2))
    left, top = np.max(L, axis=1)[:2]
    right, bottom = np.min(L, axis=1)[2:]
    if right < left or bottom < top:
        return 0
    b_area = lambda b: (b[2] - b[0]) * (b[3] - b[1])
    inter_area = b_area([left, top, right, bottom])
    b1_area, b2_area = b_area(b1), b_area(b2)
    iou = inter_area / float(b1_area + b2_area - inter_area)
    return iou

@F.udf(returnType=MapType(IntegerType(), IntegerType()))
def tracker_match(trackers, detections, bbox_col="bbox", threshold=0.3):
    """
    Match Bounding Boxes across successive image frames.
    """
    from scipy.optimize import linear_sum_assignment

    similarity = bbox_iou
    if not trackers or not detections:
        return {}
    if len(trackers) == len(detections) == 1:
        if (
                similarity(trackers[0][bbox_col], detections[0][bbox_col])
                >= threshold
        ):
            return {0: 0}

    sim_mat = np.array(
        [
            [
                similarity(tracked[bbox_col], detection[bbox_col])
                for tracked in trackers
            ]
            for detection in detections
        ],
        dtype=np.float32,
    )

    matched_idx = linear_sum_assignment(-sim_mat)
    matches = []
    for m in matched_idx:
        try:
            if sim_mat[m[0], m[1]] >= threshold:
                matches.append(m.reshape(1, 2))
        except:
            pass

    if len(matches) == 0:
        return {}
    else:
        matches = np.concatenate(matches, axis=0, dtype=int)

    rows, cols = zip(*np.where(matches))
    idx_map = {cols[idx]: rows[idx] for idx in range(len(rows))}
    return idx_map

@F.udf(returnType=ArrayType(box_struct))
def OFMotionModel(frame, prev_frame, bboxes, height, width):
    if not prev_frame:
        prev_frame = frame
    gray = cv2.cvtColor(np.frombuffer(frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)
    prev_gray = cv2.cvtColor(np.frombuffer(prev_frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)

    inst = cv2.DISOpticalFlow.create(cv2.DISOPTICAL_FLOW_PRESET_MEDIUM)
    inst.setUseSpatialPropagation(False)

    flow = inst.calc(prev_gray, gray, None)

    h, w = flow.shape[:2]
    shifted_boxes = []
    for box in bboxes:
        xmin, ymin, xmax, ymax = box
        avg_y = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 0])
        avg_x = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 1])

        shifted_boxes.append(
            {"xmin": int(max(0, xmin + avg_x)), "ymin": int(max(0, ymin + avg_y)), "xmax": int(min(w, xmax + avg_x)),
             "ymax": int(min(h, ymax + avg_y))})
    return shifted_boxes

def match_annotations(iterator, segment_id="video_uri", id_col="tracker_id"):
    """
    Used by mapPartitions to iterate over the small chunks of our hierarchically-organized data.
    """

    matched_annots = []
    for idx, data in enumerate(iterator):
        data = data[1]
        if not idx:
            old_row = {idx: uuid.uuid4() for idx in range(len(data[1]))}
            old_row[segment_id] = data[0]
            pass
        annots = []
        curr_row = {segment_id: data[0]}
        if old_row[segment_id] != curr_row[segment_id]:
            old_row = {}
        if data[2] is not None:
            for ky, vl in data[2].items():
                detection = data[1][vl].asDict()
                detection[id_col] = old_row.get(ky, uuid.uuid4())
                curr_row[vl] = detection[id_col]
                annots.append(Row(**detection))
        matched_annots.append(annots)
        old_row = curr_row
    return matched_annots

def track_detections(df, segment_id="video_uri", frames="frame", detections="faces", optical_flow=True):
    id_col = "tracker_id"
    frame_window = Window().orderBy(frames)
    value_window = Window().orderBy("value")
    annot_window = Window.partitionBy(segment_id).orderBy(segment_id, frames)
    indexer = StringIndexer(inputCol=segment_id, outputCol="vidIndex")

    # adjust detections w/ optical flow
    if optical_flow:
        df = (
            df.withColumn("prev_frames", F.lag(F.col(frames)).over(annot_window))
            .withColumn(detections, OFMotionModel(F.col(frames), F.col("prev_frames"), F.col(detections), F.col("metadata.height"), F.col("metadata.width")))
        )

    df = (
        df.select(segment_id, frames, detections)
        .withColumn("bbox", F.explode(detections))
        .withColumn(id_col, F.lit(""))
        .withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))
        .groupBy(segment_id, frames, detections)
        .agg(F.collect_list("trackables").alias("trackables"))
        .withColumn(
            "old_trackables", F.lag(F.col("trackables")).over(annot_window)
        )
        .withColumn(
            "matched",
            tracker_match(F.col("trackables"), F.col("old_trackables")),
        )
        .withColumn("frame_index", F.row_number().over(frame_window))
    )

    df = (
        indexer.fit(df)
        .transform(df)
        .withColumn("vidIndex", F.col("vidIndex").cast(StringType()))
    )
    unique_ids = df.select("vidIndex").distinct().count()
    matched = (
        df.select("vidIndex", segment_id, "trackables", "matched")
        .rdd.map(lambda x: (x[0], x[1:]))
        .partitionBy(unique_ids, lambda x: int(x[0]))
        .mapPartitions(match_annotations)
    )
    matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",
                                                                                       F.row_number().over(
                                                                                           value_window))

    return (
        df.join(matched_annotations, F.col("value_index") == F.col("frame_index"))
        .withColumnRenamed("value", "trackers_matched")
        .withColumn("tracked", F.explode(F.col("trackers_matched")))
        .select(
            segment_id,
            frames,
            detections,
            F.col("tracked.{}".format("bbox")).alias("bbox"),
            F.col("tracked.{}".format(id_col)).alias(id_col),
        )
        .withColumn(id_col, F.sha2(F.concat(F.col(segment_id), F.col(id_col)), 256))
        .withColumn("tracked_detections", F.struct([F.col("bbox"), F.col(id_col)]))
        .groupBy(segment_id, frames, detections)
        .agg(F.collect_list("tracked_detections").alias("tracked_detections"))
        .orderBy(segment_id, frames, detections)
    )

# 定义 DetectionTracker Transformer
from pyspark import keyword_only
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param

class DetectionTracker(Transformer, HasInputCol, HasOutputCol):
    """Detect and track."""

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):
        """Initialize."""
        super(DetectionTracker, self).__init__()
        self.framesCol = Param(self, "framesCol", "Column containing frames.")
        self.detectionsCol = Param(self, "detectionsCol", "Column containing detections.")
        self.optical_flow = Param(self, "optical_flow", "Use optical flow for tracker correction. Default is False")
        self._setDefault(framesCol="frame", detectionsCol="faces", optical_flow=False)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):
        """Get params."""
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setFramesCol(self, value):
        """Set framesCol."""
        return self._set(framesCol=value)

    def getFramesCol(self):
        """Get framesCol."""
        return self.getOrDefault(self.framesCol)

    def setDetectionsCol(self, value):
        """Set detectionsCol."""
        return self._set(detectionsCol=value)

    def getDetectionsCol(self):
        """Get detectionsCol."""
        return self.getOrDefault(self.detectionsCol)

    def setOpticalflow(self, value):
        """Set optical_flow."""
        return self._set(optical_flow=value)

    def getOpticalflow(self):
        """Get optical_flow."""
        return self.getOrDefault(self.optical_flow)

    def _transform(self, dataframe):
        """Do transformation."""
        input_col = self.getInputCol()
        output_col = self.getOutputCol()
        frames_col = self.getFramesCol()
        detections_col = self.getDetectionsCol()
        optical_flow = self.getOpticalflow()

        id_col = "tracker_id"
        frame_window = Window().orderBy(frames_col)
        value_window = Window().orderBy("value")
        annot_window = Window.partitionBy(input_col).orderBy(input_col, frames_col)
        indexer = StringIndexer(inputCol=input_col, outputCol="vidIndex")

        # adjust detections w/ optical flow
        if optical_flow:
            dataframe = (
                dataframe.withColumn("prev_frames", F.lag(F.col(frames_col)).over(annot_window))
                .withColumn(detections_col,
                            OFMotionModel(F.col(frames_col), F.col("prev_frames"), F.col(detections_col)))
            )

        dataframe = (
            dataframe.select(input_col, frames_col, detections_col)
            .withColumn("bbox", F.explode(detections_col))
            .withColumn(id_col, F.lit(""))
            .withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))
            .groupBy(input_col, frames_col, detections_col)
            .agg(F.collect_list("trackables").alias("trackables"))
            .withColumn(
                "old_trackables", F.lag(F.col("trackables")).over(annot_window)
            )
            .withColumn(
                "matched",
                tracker_match(F.col("trackables"), F.col("old_trackables")),
            )
            .withColumn("frame_index", F.row_number().over(frame_window))
        )

        dataframe = (
            indexer.fit(dataframe)
            .transform(dataframe)
            .withColumn("vidIndex", F.col("vidIndex").cast(StringType()))
        )

        unique_ids = dataframe.select("vidIndex").distinct().count()
        matched = (
            dataframe.select("vidIndex", input_col, "trackables", "matched")
            .rdd.map(lambda x: (x[0], x[1:]))
            .partitionBy(unique_ids, lambda x: int(x[0]))
            .mapPartitions(match_annotations)
        )

        matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",
                                                                                           F.row_number().over(
                                                                                               value_window))

        return (
            dataframe.join(matched_annotations, F.col("value_index") == F.col("frame_index"))
            .withColumnRenamed("value", "trackers_matched")
            .withColumn("tracked", F.explode(F.col("trackers_matched")))
            .select(
                input_col,
                frames_col,
                detections_col,
                F.col("tracked.{}".format("bbox")).alias("bbox"),
                F.col("tracked.{}".format(id_col)).alias(id_col),
            )
            .withColumn(id_col, F.sha2(F.concat(F.col(input_col), F.col(id_col)), 256))
            .withColumn(output_col, F.struct([F.col("bbox"), F.col(id_col)]))
            .groupBy(input_col, frames_col, detections_col)
            .agg(F.collect_list(output_col).alias(output_col))
            .orderBy(input_col, frames_col, detections_col)
        )

# 使用 DetectionTracker
detectTracker = DetectionTracker(inputCol="video_uri", outputCol="tracked_detections")
print(type(detectTracker))

detectTracker.transform(df)
final = track_detections(df)

print("Final dataframe")
final.select("tracked_detections").show(100, truncate=False)

代码解释

  1. Spark 配置:

    恒浪威购商城
    恒浪威购商城

    基于asp.net2.0框架技术与企业级分布式框架以及与 ms sql server 2000数据库无缝集合而成,并且融合当前流行的ajax技术进行编写的电子商务系统,她整合了多用户商城、单用户商城功能和恒浪网站整合管理系统,吸收绝大部分同类产品的精华和优点,独创网络团购(b2t)电子商务模式,流程化的团购功能和视频导购等功能,是一款极具商业价值的电子商务系统。商城前台功能概述:商城会员可前台自行

    下载
    • 配置 SparkSession,设置 App 名称和 Master 节点。
    • 增加 spark.driver.memory 配置,避免内存不足。
  2. 视频元数据提取:

    • 使用 FFmpeg 提取视频的宽度、高度、帧数和时长。
    • video_probe UDF 用于获取视频元数据。
  3. 视频帧提取:

    • 使用 FFmpeg 提取视频帧,并将其转换为图像字节数组。
    • video2images UDF 用于提取视频帧。
  4. 人脸检测:

    • 使用 face_recognition 库检测视频帧中的人脸。
    • face_detector UDF 用于检测人脸,并返回人脸的边界框。
  5. 人脸追踪:

    • 使用光流法(Optical Flow)和匈牙利算法(Hungarian Algorithm)进行人脸追踪。
    • OFMotionModel UDF 用于使用光流法预测下一帧的人脸位置。
    • tracker_match UDF 用于匹配相邻帧中的人脸。
    • match_annotations 函数用于在分区中匹配人脸。
    • track_detections 函数用于整合人脸检测和追踪结果。
  6. DetectionTracker Transformer:

    • 将人脸检测和追踪逻辑封装成一个 Transformer,方便在 Spark ML Pipeline 中使用。

注意事项

  • 内存配置: 视频处理通常需要大量的内存。请根据实际情况调整 spark.driver.memory 和 spark.executor.memory 参数。
  • FFmpeg 安装: 确保 FFmpeg 安装正确,并且可以在系统中访问。
  • 视频文件路径: 确保视频文件路径正确,并且 Spark 可以访问这些文件。
  • UDF 性能: UDF 的性能可能不如 Spark 内置函数。在实际应用中,尽量使用 Spark 内置函数优化性能。
  • 并行度: 根据集群的规模和视频文件的大小,调整 Spark 的并行度,以充分利用集群的计算资源。

总结

本文档提供了一个使用 PySpark 并行处理视频文件的完整示例,包括视频元数据提取、视频帧提取、人脸检测和追踪。通过使用 Spark 的分布式计算能力,我们可以高效地处理大规模的视频数据,并进行各种视频分析任务。在实际应用中,可以根据具体需求调整代码,例如修改人脸检测算法、调整光流法参数等。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

330

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

235

2023.10.07

页面置换算法
页面置换算法

页面置换算法是操作系统中用来决定在内存中哪些页面应该被换出以便为新的页面提供空间的算法。本专题为大家提供页面置换算法的相关文章,大家可以免费体验。

411

2023.08.14

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

981

2023.11.02

常见的linux系统有哪些
常见的linux系统有哪些

linux系统有Ubuntu、Fedora、CentOS、Debian、openSUSE、Arch Linux、Gentoo、Slackware、Linux Mint、Kali Linux。更多关于linux系统的文章详情请阅读本专题下面的文章。php中文网欢迎大家前来学习。

816

2023.10.27

ubunt上安装和配置vnc
ubunt上安装和配置vnc

安装方法:安装VNC服务器、启动VNC服务器、设置VNC密码等等。想了解更多ubuntu的相关内容,可以阅读本专题下面的文章。

414

2023.12.28

ubuntu启动黑屏解决方法
ubuntu启动黑屏解决方法

ubuntu启动黑屏解决方法:检查是否是电源问题、检查内存是否接触不良、检查显卡问题等。想了解更多ubuntu的相关内容,可以阅读本专题下面的文章。

661

2023.12.28

为什么ubuntu有网络连接但不能上网
为什么ubuntu有网络连接但不能上网

ubuntu有网络连接但不能上网的原因:1、dns配置问题;2、代理服务器设置问题;3、网络防火墙设置问题;4、路由器或调制解调器设置问题;5、网络驱动程序问题;6、网络配置文件问题;7、其他问题。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

460

2024.09.05

C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

9

2026.01.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

相关下载

更多

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.7万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号