我是两个模型串联 一个是自定义yolo一个是自定义reid模型。那么我想添加帧差法自定义插件GS和根据deepstream-nvdsanalytics修改的ROI

################################################################################

nvdsanalytics 配置文件

ROI 区域过滤配置

################################################################################

[property]
enable=1

配置分辨率(必须与 streammux 一致!)

streammux 实际分辨率是 1920x1080(从 deepstream_pipeline_manager.py 第 1127-1128 行)

config-width=1280
config-height=720

osd-mode: 0=不显示, 1=显示线条和ROI, 2=显示所有信息

osd-mode=0
display-font-size=12

流 0 - 第一个摄像头的 ROI 配置

[roi-filtering-stream-0]
enable=1

ROI 区域坐标:左侧区域(可根据需要调整)

坐标:(0,0) → (960,0) → (960,1080) → (0,1080)

这定义了画面左半部分作为 ROI

roi-RF=0;200;750;200;0;720;750;720

inverse-roi=0 表示保留 ROI 内的对象,=1 表示保留 ROI 外的对象

inverse-roi=0

class-id=-1 表示所有类别,可指定特定类别如 class-id=0

class-id=-1 这里是ROI的配置。。。 def create_pipeline(self):
“”“创建单GPU的Pipeline,所有组件使用同一个GPU”“”
logger.info(f"[GPU {self.gpu_id}] 创建Pipeline,流数: {len(self.active_cameras)}")

    self.pipeline = Gst.Pipeline()

    # ====== 输入部分:多路RTSP源 ======
    streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
    if not streammux:
        raise RuntimeError("无法创建 nvstreammux 元素")

    # 对性能影响很重要的参数,设置为 8
    streammux.set_property('batch-size', len(self.active_cameras))

    # streammux.set_property('width', 1920)
    # streammux.set_property('height', 1080)

    # 归一化处理,适配大部分场景
    streammux.set_property('width', g_frame_width)
    streammux.set_property('height', g_frame_height)
    streammux.set_property('enable-padding', 1)  # 保持长宽比
    # 25 fps
    streammux.set_property('batched-push-timeout', 40000)

    streammux.set_property('live-source', 1)
    # 🎯 使用该Pipeline指定的GPU
    streammux.set_property('gpu-id', self.gpu_id)
    streammux.set_property('nvbuf-memory-type', 0)

    # 🔧 FIX: 防止单个源EOS导致整个pipeline结束
    try:
        streammux.set_property('drop-pipeline-eos', True)
        logger.info(f"[GPU {self.gpu_id}] nvstreammux 已启用 drop-pipeline-eos,忽略单源EOS")
    except Exception as e:
        logger.warning(f"[GPU {self.gpu_id}] 无法设置drop-pipeline-eos: {e}")

    # 🔧 FIX: 对于实时RTSP流,不同步输入以提高稳定性
    try:
        streammux.set_property('sync-inputs', 0)
        logger.info(f"[GPU {self.gpu_id}] nvstreammux 已禁用sync-inputs,提高RTSP稳定性")
    except Exception as e:
        logger.warning(f"[GPU {self.gpu_id}] 无法设置sync-inputs: {e}")

    self.pipeline.add(streammux)
    logger.info(f"[GPU {self.gpu_id}] nvstreammux 使用 GPU {self.gpu_id} (已应用EOS防护)")


    # 添加source bins
    for camera_id, camera_vo in self.active_cameras.items():
        sink_idx = self.stream_index_map[camera_id]
        src_bin = self._create_source_bin(camera_id, camera_vo.rtsp_url, self.gpu_id)
        self.pipeline.add(src_bin)
        sinkpad = streammux.request_pad_simple(f"sink_{sink_idx}")
        srcpad = src_bin.get_static_pad("src")
        if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
            raise RuntimeError(f"无法连接 {camera_id} 到 streammux")
        logger.info(f"[Pipeline] 链接流 {camera_id} 到 sink_{sink_idx}")

    # # ====== 推理部分:批处理推理 ======
    # queue1 = Gst.ElementFactory.make("queue", "queue1")
    # pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    # if not pgie:
    #     raise RuntimeError("无法创建 nvinfer 元素")
    #
    # # 🎯 使用该GPU对应的配置文件
    # yolov8_config = get_yolov8_config_path(self.gpu_id)
    # pgie.set_property('config-file-path', yolov8_config)

    # ====== 创建帧差法元素(在YOLO前进行运动检测) ======
    logger.info("=" * 80)
    logger.info("🎬 开始创建帧差法运动检测元素...")
    logger.info("=" * 80)

    # 🎯 加载本地的帧差法插件(不需要安装到系统)
    framediff_plugin_path = "/sgx/prod/eStellar/DeepStream-Yolo/frame_diff"
    framediff_so_path = os.path.join(framediff_plugin_path, "libgstframediff.so")

    logger.info(f"📂 检查帧差法插件路径: {framediff_plugin_path}")

    if not os.path.exists(framediff_plugin_path):
        logger.error(f"❌ 帧差法插件目录不存在: {framediff_plugin_path}")
        self.framediff = None
    elif not os.path.exists(framediff_so_path):
        logger.error(f"❌ 帧差法插件文件不存在: {framediff_so_path}")
        logger.error(f"   请先编译插件: cd {framediff_plugin_path} && make")
        self.framediff = None
    else:
        logger.info(f"✅ 找到帧差法插件: {framediff_so_path}")
        logger.info(f"   文件大小: {os.path.getsize(framediff_so_path) / 1024:.1f} KB")

        # 加载插件
        logger.info(f"🔧 正在加载帧差法插件...")
        registry = Gst.Registry.get()
        scan_result = registry.scan_path(framediff_plugin_path)
        logger.info(f"   扫描结果: {scan_result}")

        # 尝试创建元素
        logger.info(f"🔧 尝试创建 framediff 元素...")
        self.framediff = Gst.ElementFactory.make("framediff", "framediff")

        if not self.framediff:
            logger.error("❌ 无法创建 framediff 元素!")
            logger.error("   可能原因:")
            logger.error("   1. 插件未正确编译")
            logger.error("   2. 缺少依赖库(CUDA、DeepStream)")
            logger.error("   3. GStreamer 无法识别插件")
            logger.error(f"   编译命令: cd {framediff_plugin_path} && make")
            logger.error("   调试命令: gst-inspect-1.0 framediff")
            self.framediff = None
        else:
            logger.info("✅ framediff 元素创建成功!")

            # 🎯 配置帧差法参数(与 YOLO 同步:每 25 帧执行一次)
            logger.info("🔧 配置帧差法参数...")
            self.framediff.set_property("diff-threshold", 30)        # 像素差异阈值 (0-255)
            self.framediff.set_property("motion-threshold", 0.01)    # 运动比例阈值 (0.0-1.0)
            self.framediff.set_property("enable-debug", True)        # 🎯 启用调试输出
            self.framediff.set_property("gpu-id", self.gpu_id)       # GPU ID
            self.framediff.set_property("interval", 25)              # 🎯 每 25 帧计算一次(与 YOLO 同步)

            self.pipeline.add(self.framediff)
            logger.info("=" * 80)
            logger.info("🎉 帧差法配置完成!")
            logger.info(f"   GPU ID: {self.gpu_id}")
            logger.info(f"   推理间隔: 每 25 帧执行一次 (与 YOLO 同步)")
            logger.info(f"   像素差异阈值: 30")
            logger.info(f"   运动比例阈值: 0.01 (1%)")
            logger.info(f"   调试模式: 已启用")
            logger.info("=" * 80)

    # 创建YOLO主检测器
    print("✓ 创建YOLO主检测器...")
    self.pgie = Gst.ElementFactory.make("nvinfer", "pgie")
    if not self.pgie:
            print("✗ 无法创建YOLO检测器")
            return False



    self.pgie.set_property("config-file-path", "/sgx/prod/eStellar/agent/config/config_infer_primary_yoloV8.txt")
    self.pgie.set_property("batch-size", 6)
    self.pgie.set_property("unique-id", 1)
    self.pgie.set_property("process-mode", 1)
    self.pipeline.add(self.pgie)
    print(f"  配置: batch-size={6}, process-mode=1 (Full frame)")

    # 创建ReID副检测器
    print("✓ 创建ReID副检测器...")
    self.sgie = Gst.ElementFactory.make("nvinfer", "sgie")
    if not self.sgie:
        print("✗ 无法创建ReID检测器")
        return False


    # 🎯 使用该Pipeline指定的GPU进行推理
    self.sgie.set_property("config-file-path", "/sgx/prod/eStellar/agent/config/config_infer_reid.txt")
    self.sgie.set_property("batch-size", 6)
    self.sgie.set_property("unique-id", 2)
    self.sgie.set_property("process-mode", 2)
    self.pipeline.add(self.sgie)
    print(f"  配置: batch-size=6, process-mode=2 (Object level)")

    # 创建其他元素
    print("✓ 创建其他处理元素...")
    nvconv = Gst.ElementFactory.make("nvvideoconvert", "nvconv")
    nvosd = Gst.ElementFactory.make("nvdsosd", "nvosd")
    sink = Gst.ElementFactory.make("fakesink", "sink")

    for elem in [nvconv, nvosd, sink]:
        if not elem:
            print("✗ 无法创建元素")
            return False
        self.pipeline.add(elem)

    sink.set_property("sync", False)

    # ========== 创建 tee 分支(检测+ReID 和 图片保存) ==========
    if self.debug_image:
        print("✓ 创建 tee 分支(启用图片保存调试)...")
        tee = Gst.ElementFactory.make("tee", "tee")
        if not tee:
            print("✗ 无法创建 tee 元素")
            return False
        self.pipeline.add(tee)

        # 分支1:检测 + ReID 分支
        queue_infer = Gst.ElementFactory.make("queue", "queue_infer")
        if not queue_infer:
            print("✗ 无法创建 queue_infer")
            return False
        self.pipeline.add(queue_infer)

        # 分支2:图片保存分支
        queue_image = Gst.ElementFactory.make("queue", "queue_image")
        nvconv_image = Gst.ElementFactory.make("nvvideoconvert", "nvconv_image")
        capsfilter_image = Gst.ElementFactory.make("capsfilter", "capsfilter_image")
        sink_image = Gst.ElementFactory.make("fakesink", "sink_image")

        for elem in [queue_image, nvconv_image, capsfilter_image, sink_image]:
            if not elem:
                print("✗ 无法创建图片保存分支元素")
                return False
            self.pipeline.add(elem)

        # 设置图片分支的 caps(转换为 BGR 格式供 OpenCV 使用)
        caps_image = Gst.Caps.from_string("video/x-raw, format=BGRx")
        capsfilter_image.set_property("caps", caps_image)
        sink_image.set_property("sync", False)
    else:
        print("✓ 跳过 tee 分支(图片保存调试已禁用)...")
        tee = None
        queue_infer = None

    # 链接所有元素
    print("🔗 链接Pipeline...")

    if self.debug_image:
        # ========== 启用图片保存调试:使用 tee 分支 ==========
        # 根据是否有 framediff 插件决定链接方式
        if self.framediff:
            # 有帧差法 (CUDA 版本): streammux -> framediff -> tee
            link_success = (streammux.link(self.framediff) and
                           self.framediff.link(tee))
            print("  链路: streammux -> framediff (CUDA) -> tee")
        else:
            # 无帧差法: streammux -> tee
            link_success = streammux.link(tee)
            print("  链路: streammux -> tee")

        if not link_success:
            print("✗ Pipeline链接到tee失败")
            return False

        # 链接 tee 的分支1:检测 + ReID
        # tee -> queue_infer -> pgie -> sgie -> nvconv -> nvosd -> sink
        tee_src_pad_infer = tee.request_pad_simple("src_%u")
        queue_infer_sink_pad = queue_infer.get_static_pad("sink")
        if tee_src_pad_infer.link(queue_infer_sink_pad) != Gst.PadLinkReturn.OK:
            print("✗ 无法链接 tee -> queue_infer")
            return False

        if not (queue_infer.link(self.pgie) and
                self.pgie.link(self.sgie) and
                self.sgie.link(nvconv) and
                nvconv.link(nvosd) and
                nvosd.link(sink)):
            print("✗ 检测+ReID分支链接失败")
            return False
        print("  分支1: tee -> queue_infer -> pgie -> sgie -> nvconv -> nvosd -> sink")

        # 链接 tee 的分支2:图片保存
        # tee -> queue_image -> nvconv_image -> capsfilter_image -> sink_image
        tee_src_pad_image = tee.request_pad_simple("src_%u")
        queue_image_sink_pad = queue_image.get_static_pad("sink")
        if tee_src_pad_image.link(queue_image_sink_pad) != Gst.PadLinkReturn.OK:
            print("✗ 无法链接 tee -> queue_image")
            return False

        if not (queue_image.link(nvconv_image) and
                nvconv_image.link(capsfilter_image) and
                capsfilter_image.link(sink_image)):
            print("✗ 图片保存分支链接失败")
            return False
        print("  分支2: tee -> queue_image -> nvconv_image -> capsfilter_image -> sink_image")

        # 添加图片保存分支的 Probe
        sink_image.get_static_pad("sink").add_probe(
            Gst.PadProbeType.BUFFER, self._image_save_probe, 0)
    else:
        # ========== 禁用图片保存调试:直接链接,无 tee 分支 ==========
        # 根据是否有 framediff 插件决定链接方式
        if self.framediff:
            # 有帧差法 (CUDA 版本,直接处理 NVMM buffer): streammux -> framediff -> pgie
            link_success = (streammux.link(self.framediff) and
                           self.framediff.link(self.pgie))
            print("  链路: streammux -> framediff (CUDA) -> pgie -> sgie -> nvconv -> nvosd -> sink")
        else:
            # 无帧差法: streammux -> pgie
            link_success = streammux.link(self.pgie)
            print("  链路: streammux -> pgie -> sgie -> nvconv -> nvosd -> sink")

        if not link_success:
            print("✗ Pipeline链接失败")
            return False

        # 直接链接检测和ReID
        if not (self.pgie.link(self.sgie) and
                self.sgie.link(nvconv) and
                nvconv.link(nvosd) and
                nvosd.link(sink)):
            print("✗ 检测+ReID链接失败")
            return False

    # 添加Probe
    print("📍 添加Probe...\n")

    self.pgie.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER,
                                              self.yolo_buffer_probe, 0)
    self.sgie.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER,
                                              self.reid_buffer_probe, 0)

    # 禁用 deepstream 默认的可视化
    # pgie.set_property('process-mode', 1)  # 1=PROCESS_MODE_CLIP_OBJECTS

    # # ====== 追踪部分:对象追踪(可选) ======
    # nvtracker = None
    # g_enable_tracking = False
    # if g_enable_tracking:
    #     nvtracker = Gst.ElementFactory.make("nvtracker", "tracker")
    #     if not nvtracker:
    #         raise RuntimeError("无法创建 nvtracker 元素")
    #
    #     # 配置追踪器 - 使用更保守的设置以避免内存崩溃
    #     nvtracker.set_property('tracker-width', 320)  # 减少分辨率以降低内存使用
    #     nvtracker.set_property('tracker-height', 192)  # 减少分辨率以降低内存使用
    #     # 🎯 使用该Pipeline指定的GPU进行跟踪
    #     nvtracker.set_property('gpu-id', self.gpu_id)
    #     nvtracker.set_property('ll-lib-file',
    #                            '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
    #     logger.info(f"[GPU {self.gpu_id}] nvtracker 使用 GPU {self.gpu_id}")
    #
    #     # 使用平衡配置文件(优先保障目标检测)
    #     balanced_config_path = '/sgx/prod/estellar/agent/config/tracker_config_balanced.yml'
    #     safe_config_path = '/sgx/prod/estellar/agent/config/tracker_config_safe.yml'
    #
    #     if os.path.exists(balanced_config_path):
    #         nvtracker.set_property('ll-config-file', balanced_config_path)
    #         logger.info("使用平衡模式追踪器配置(优先保障目标检测)")
    #     elif os.path.exists(safe_config_path):
    #         nvtracker.set_property('ll-config-file', safe_config_path)
    #         logger.info("使用安全模式追踪器配置")
    #     else:
    #         nvtracker.set_property('ll-config-file', '/sgx/prod/estellar/agent/config/tracker_config.yml')
    #         logger.warning("平衡/安全配置文件不存在,使用默认配置")
    # else:
    #     logger.info("目标追踪已禁用,跳过追踪器配置")
    #
    # # ====== 分流部分:nvstreamdemux ======
    # nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "stream-demuxer")
    # if not nvstreamdemux:
    #     raise RuntimeError("无法创建 nvstreamdemux 元素")
    #
    # # 添加核心元素
    # elements = [queue1, pgie, nvstreamdemux]
    # if nvtracker:
    #     elements.insert(2, nvtracker)  # 在pgie和nvstreamdemux之间插入tracker
    # for elem in elements:
    #     self.pipeline.add(elem)
    #
    # # 链接核心pipeline
    # if not streammux.link(queue1):
    #     raise RuntimeError("无法连接 streammux -> queue1")
    # if not queue1.link(pgie):
    #     raise RuntimeError("无法连接 queue1 -> pgie")
    #
    # # 根据是否启用追踪,选择不同的链接方式
    # if nvtracker:
    #     if not pgie.link(nvtracker):
    #         raise RuntimeError("无法连接 pgie -> nvtracker")
    #     if not nvtracker.link(nvstreamdemux):
    #         raise RuntimeError("无法连接 nvtracker -> nvstreamdemux")
    #     logger.info("Pipeline链路: streammux -> queue1 -> pgie -> nvtracker -> nvstreamdemux")
    # else:
    #     if not pgie.link(nvstreamdemux):
    #         raise RuntimeError("无法连接 pgie -> nvstreamdemux")
    #     logger.info("Pipeline链路: streammux -> queue1 -> pgie -> nvstreamdemux (无追踪)")
    #
    # # ====== 输出部分:为每个流创建独立的编码推流分支 ======
    # for camera_id in self.active_cameras.keys():
    #     sink_idx = self.stream_index_map[camera_id]
    #     config = self.rtmp_configs[camera_id]
    #     self._create_output_branch(nvstreamdemux, camera_id, sink_idx, config)
    #
    # # 添加推理probe(在最后一个推理元素后获取结果)
    # if nvtracker:
    #     # 有追踪器时,在tracker之后获取追踪结果
    #     probe_pad = nvtracker.get_static_pad("src")
    #     logger.info("推理probe已绑定到nvtracker输出")
    # else:
    #     # 无追踪器时,在pgie之后获取检测结果
    #     probe_pad = pgie.get_static_pad("src")
    #     logger.info("推理probe已绑定到pgie输出(无追踪模式)")
    #
    # probe_pad.add_probe(Gst.PadProbeType.BUFFER, self._pgie_src_pad_buffer_probe, 0)
    #
    # 运行Pipeline
    print("=" * 80)
    print("🎬 运行中... (Ctrl+C停止)")
    print("=" * 80 + "\n")

    loop = GLib.MainLoop()
    bus = self.pipeline.get_bus()
    bus.add_signal_watch()

    def on_message(bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("\n✓ 到达EOS,停止Pipeline")
            loop.quit()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"\n✗ Pipeline错误: {err}")
            print(f"  详情: {debug}")
            loop.quit()
        elif t == Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            print(f"⚠️  警告: {err}")
        return True

    bus.connect("message", on_message, loop)
    self.pipeline.set_state(Gst.State.PLAYING)

    try:
        loop.run()
    except KeyboardInterrupt:
        print("\n⏹️  停止...\n")
    finally:
        self.pipeline.set_state(Gst.State.NULL)
        # self._print_summary()

    return True 这个是第二个模型的获取 ROI的区域 但是获取不到检测区域内的检测物体。。