Deepstream8.0 或者7.1 的版本添加帧差法 以及 ROI 区域过滤配置

以下是提问模版,可以修改仅留下适用于您的问题。

• GPU pro 2000 backwell
**• 7.1 或 8.0 **
• TensorRT 版本
• 580.95.05
• Deepstream8.0 或者7.1 的版本添加自定义插件帧差法插件 以及 添加 ROI 区域过滤配置 两者的效果都没有成功
def reid_buffer_probe(self, pad, info, u_data):
“”“ReID特征提取Probe - 从对象元数据中提取特征和检测框”“”
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK

    try:
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        if not batch_meta:
            return Gst.PadProbeReturn.OK

        l_frame = batch_meta.frame_meta_list

        # 🎯 创建按摄像头分组的检测结果列表
        # 结构: [{'camera_id': 1, 'datasets': [detection1, detection2, ...]}, ...]
        detectionList = []
        camera_datasets_map = {}  # 临时字典,用于按camera_id分组

        while l_frame is not None:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                frame_num = frame_meta.frame_num
                pad_index = frame_meta.pad_index
                self.frame_count = frame_num

                # 🎥 根据pad_index找到对应的camera_id
                camera_id = None
                for cid, idx in self.stream_index_map.items():
                    if idx == pad_index:
                        camera_id = cid
                        break

                if camera_id is None:
                    camera_id = pad_index  # 使用pad_index作为备用

                # 只在特定帧提取特征的数据
                if (frame_num % self.frame_interval) == 0:
                    detect_time = time.time()

                    # 初始化该摄像头的datasets列表
                    if camera_id not in camera_datasets_map:
                        camera_datasets_map[camera_id] = []

                    # 🔑 遍历对象,从每个对象的user_meta中获取ReID特征
                    l_obj = frame_meta.obj_meta_list

                    # 检测的数组
                    while l_obj is not None:
                        try:
                            obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)

                            # 遍历对象的user metadata
                            l_user = obj_meta.obj_user_meta_list

                            while l_user is not None:
                                try:
                                    user_meta = pyds.NvDsUserMeta.cast(l_user.data)

                                    # 检查是否是tensor输出元数据
                                    if user_meta.base_meta.meta_type == pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META:
                                        tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)

                                        # 检查是ReID的输出(unique-id=2)
                                        if tensor_meta.unique_id == 2 and tensor_meta.num_output_layers >= 1:
                                            try:
                                                # 获取第一个输出层
                                                if hasattr(pyds, 'get_nvds_LayerInfo'):
                                                    layer_info = pyds.get_nvds_LayerInfo(tensor_meta, 0)
                                                else:
                                                    layer_info = tensor_meta.out_layer_info[0]

                                                # 提取特征向量
                                                features = self.extract_feature_vector(layer_info)

                                                # 构建检测结果字典
                                                detection = {
                                                    'bbox': [
                                                        int(obj_meta.rect_params.left),
                                                        int(obj_meta.rect_params.top),
                                                        int(obj_meta.rect_params.left + obj_meta.rect_params.width),
                                                        int(obj_meta.rect_params.top + obj_meta.rect_params.height)
                                                    ],
                                                    'class_id': int(obj_meta.class_id),
                                                    'confidence': float(obj_meta.confidence),
                                                    'tracker_confidence': 0.5,
                                                    'resize_shape': (g_frame_height, g_frame_width),
                                                    'class_name': obj_meta.obj_label if obj_meta.obj_label else f"class_{obj_meta.class_id}",
                                                    'is_tracked': 1,
                                                    'feature': features,
                                                    'vehicle_color': 0,
                                                    'vehicle_type': 0,
                                                    'or_sport': 0,
                                                    'camera_id':camera_id
                                                }

                                                # 🎯 将检测结果添加到对应摄像头的datasets中
                                                camera_datasets_map[camera_id].append(detection)
                                                self.feature_count += 1

                                                # 打印日志
                                                # logger.info(f"[Camera {camera_id}][Frame {frame_num}] 🎯 对象#{obj_meta.object_id} 类别={obj_meta.class_id} 置信度={obj_meta.confidence:.2f}")

                                            except Exception as e:
                                                logger.warning(f"[Camera {camera_id}] 特征提取失败: {e}")

                                except StopIteration:
                                    break
                                try:
                                    l_user = l_user.next
                                except StopIteration:
                                    break

                        except StopIteration:
                            break
                        try:
                            l_obj = l_obj.next
                        except StopIteration:
                            break

                    # 演示模式:同步可视化(绘制到OSD)
                    camera_vo = self.active_cameras.get(camera_id)
                    if camera_vo and getattr(camera_vo, 'demo_enabled', 0):
                        # 每帧都进行可视化,使用缓存的推理结果避免闪烁
                        self._smooth_car_visualization(frame_meta, batch_meta, camera_id)
                try:
                    l_frame = l_frame.next
                except StopIteration:
                    break

            except StopIteration:
                break

        # 🎯 构建最终的 detectionList 带上当前时间
        current_time = time.time()
        for cam_id, datasets in camera_datasets_map.items():
            if datasets:  # 只添加有检测结果的摄像头
                detectionList.append({
                    'camera_id': cam_id,
                    'datasets': datasets,
                    'current_time':current_time
                })

        # 📊 打印汇总信息
        if detectionList:
            total_detections = sum(len(item['datasets']) for item in detectionList)
            logger.info(f"📊 本批次检测汇总: {len(detectionList)}个摄像头, 共{total_detections}个对象")
            for item in detectionList:
                logger.info(f"  Camera {item['camera_id']}: {len(item['datasets'])}个对象")

            # 🎯 保存检测结果到 TXT 文件(DEBUG_VIDEO 开启时)
            for item in detectionList:
                cam_id = item['camera_id']
                datasets = item['datasets']
                # 获取原始分辨率(从摄像头配置中获取)
                camera_vo = self.active_cameras.get(cam_id)
                if camera_vo:
                    orig_width = getattr(camera_vo, 'width', 1920)
                    orig_height = getattr(camera_vo, 'height', 1080)
                else:
                    orig_width, orig_height = 1920, 1080
                self._save_detection_to_txt(cam_id, datasets, current_time, orig_width, orig_height)

            # 🔄 调用推理回调(如果有)
            if self.inference_callback and detectionList:
                self._async_batch_inference_callback(detectionList)





    except Exception as e:
        logger.error(f"⚠️ ReID Probe异常: {e}")
        import traceback
        traceback.print_exc()

    return Gst.PadProbeReturn.OK

def _async_batch_inference_callback(self, detectionList):
    """
    批量异步推理回调 - 处理多摄像头检测结果

    Args:
        detectionList: List[Dict] 格式,每个元素包含:
            {
                'camera_id': int,
                'datasets': List[Dict],  # 检测结果数组
                'current_time': float
            }
    """
    logger.info(f"[DeepStreamPipelineManager] 📤 批量推理回调: {len(detectionList)}个摄像头")

    try:
        if self.inference_callback:
            # ✅ 正确遍历 List 格式的 detectionList
            for item in detectionList:
                camera_id = item['camera_id']
                datasets = item['datasets']
                logger.info(f"[DeepStreamPipelineManager]   Camera {camera_id}: {len(datasets)}个对象")

            # 提交到线程池异步处理,并在完成后设置图片保存标志
            logger.debug(f"[DeepStreamPipelineManager] 📤 提交到线程池处理")
            future = self.thread_pool.submit(self.inference_callback, detectionList)

            # 添加回调:在 inference_callback 完成后,设置图片保存标志
            future.add_done_callback(lambda f: self._on_inference_complete(detectionList))

            logger.debug(f"[DeepStreamPipelineManager] ✅ 已提交到线程池")
        else:
            logger.warning(f"[DeepStreamPipelineManager] ⚠️ 未设置推理回调函数")
    except Exception as e:
        logger.error(f"[DeepStreamPipelineManager] ❌ 批量推理回调异常: {e}")
        import traceback
        logger.error(traceback.format_exc())

def _on_inference_complete(self, detectionList):
    """
    推理完成回调 - 设置图片保存标志

    此时 VehicleProcessor.process 已完成,内存缓存已更新为最新数据
    """
    try:
        current_time = time.time()

        with self.image_save_lock:
            for item in detectionList:
                camera_id = item['camera_id']
                # 设置图片保存标志,标记该摄像头需要保存图片
                self.image_save_flags[camera_id] = current_time
                logger.debug(f"[DeepStreamPipelineManager] ✅ 设置图片保存标志: camera_id={camera_id}")

    except Exception as e:
        logger.error(f"[DeepStreamPipelineManager] ❌ 设置图片保存标志异常: {e}")

def extract_feature_vector(self, layer_info):
    """从NvDsInferLayerInfo提取特征向量(参考deepstream_reid_rtsp.py)"""
    import ctypes
    from ctypes import pythonapi, py_object

    # 计算特征维度
    feature_dim = 1
    for i in range(layer_info.dims.numDims):
        feature_dim *= layer_info.dims.d[i]

    feature_vector = []

    try:
        ptr = layer_info.buffer
        if not ptr:
            return [0.0] * feature_dim

        # 🔑 使用ctypes.pythonapi从PyCapsule中提取指针
        pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, ctypes.c_char_p]

        ptr_value = pythonapi.PyCapsule_GetPointer(ptr, None)

        if not ptr_value:
            return [0.0] * feature_dim

        # 将void指针转换为float指针
        float_ptr = ctypes.cast(ptr_value, ctypes.POINTER(ctypes.c_float))

        # 逐个读取float值
        for i in range(feature_dim):
            try:
                feature_vector.append(float(float_ptr[i]))
            except:
                feature_vector.append(0.0)

    except Exception as e:
        print(f"    ⚠️ 特征提取失败({e})")
        feature_vector = [0.0] * feature_dim

    # 🎯 L2 归一化(与 ONNX 保持一致)
    feature_array = np.array(feature_vector, dtype=np.float32)
    norm = np.linalg.norm(feature_array)
    if norm > 1e-10:
        feature_array = feature_array / norm

    return feature_array.tolist()

def get_box_info(self, obj_meta):
    """从对象元数据中获取检测框信息"""
    try:
        # 获取边界框坐标
        left = obj_meta.rect_params.left
        top = obj_meta.rect_params.top
        width = obj_meta.rect_params.width
        height = obj_meta.rect_params.height

        # 计算框的坐标 (x1, y1, x2, y2)
        x1 = left
        y1 = top
        x2 = left + width
        y2 = top + height

        # 获取类别ID
        class_id = obj_meta.class_id

        return (x1, y1, x2, y2, class_id)
    except:
        return (0, 0, 0, 0, -1)


# 演示模式
def _generate_camera_configs(self):
    """为每个摄像头生成配置"""
    for idx, (camera_id, camera_vo) in enumerate(self.active_cameras.items()):
        self.stream_index_map[camera_id] = idx

        # 生成RTMP地址(演示模式)
        if getattr(camera_vo, 'is_demo', False):
            from core.demo.aliyun_video import generate_push_url, generate_pull_url
            push_url = generate_push_url(camera_id)
            pull_url = generate_pull_url(camera_id, protocol="RTMP")
        else:
            push_url = None
            pull_url = None

        self.rtmp_configs[camera_id] = {
            'push_url': push_url,
            'pull_url': pull_url,
            'sink_index': idx,
            'is_demo': getattr(camera_vo, 'is_demo', False)
        }

        if push_url:
            logger.info(f"[{camera_id}] 推流地址: {push_url}")
            logger.info(f"[{camera_id}] 播放地址: {pull_url}")
        else:
            logger.info(f"[{camera_id}] 非演示模式,无推流地址")

# 车辆跟踪演示模式
def _generate_camera_car_track_configs(self):
    """为每个摄像头生成配置"""
    for idx, (camera_id, camera_vo) in enumerate(self.active_cameras.items()):
        self.stream_index_map[camera_id] = idx

        # 生成RTMP地址(演示模式)
        if getattr(camera_vo, 'demo_enabled', 0):
            from core.demo.aliyun_video import generate_push_url, generate_pull_url
            push_url = generate_push_url(camera_id)
            pull_url = generate_pull_url(camera_id, protocol="RTMP")
        else:
            push_url = None
            pull_url = None

        self.rtmp_configs[camera_id] = {
            'push_url': push_url,
            'pull_url': pull_url,
            'sink_index': idx,
            'demo_enabled': getattr(camera_vo, 'demo_enabled', 0)
        }

        if push_url:
            logger.info(f"[{camera_id}] 推流地址: {push_url}")
            logger.info(f"[{camera_id}] 播放地址: {pull_url}")
            self._publish_demo_url(camera_id, push_url, pull_url)
        else:
            logger.info(f"[{camera_id}] 非演示模式,无推流地址")

def _publish_demo_url(self, camera_id: str, push_url: str, pull_url: str) -> None:
    """推送车辆追踪演示推流地址"""
    try:
        from core.comm.mqtt_client import MQTTClient
        logger.info(f"############# [demo_url_push] gggg")
        payload = {
            "camera_id": camera_id,
            "push_url": push_url,
            "pull_url": pull_url,
            "event_type": "demo_url_push",
            "source_id": camera_id,
            "attributes": {},
            "event_id": str(uuid.uuid4()),
            "event_timestamp": int(time.time() * 1000),
        }

        MQTTClient.get_instance().publish_message("event/demo_url_push", payload, qos=1)
        logger.info(f"############# [demo_url_push] [{camera_id}] done {pull_url}")
    except Exception as e:
        logger.error(f"[{camera_id}] 推流地址上报失败: {e}")
def _update_pipeline_stats(self):
    """定期输出Pipeline运行统计信息"""
    if not self.running:
        return

    self._stats_frames_since_report += 1
    now = time.time()
    elapsed = now - self._stats_last_report_time

    if elapsed >= self._stats_interval:
        fps = (self._stats_frames_since_report / elapsed) if elapsed > 0 else 0.0
        interval_values = list(self.camera_inference_intervals.values())
        if interval_values:
            interval_summary = f"{min(interval_values):.2f}~{max(interval_values):.2f}s"
        else:
            interval_summary = "N/A"

        logger.info(
            f"[GPU {self.gpu_id}] Pipeline运行统计 | 最近{elapsed:.1f}s推理帧: {self._stats_frames_since_report} "
            f"| 平均FPS: {fps:.2f} | 摄像头数: {len(self.active_cameras)} "
            f"| 推理间隔: {interval_summary} | nvinfer.interval: {g_inference_interval} "
            f"| 编码器: {self.encoder_name or 'N/A'} | 状态: {'running' if self.running else 'stopped'}"
        )

        self._stats_last_report_time = now
        self._stats_frames_since_report = 0

def create_pipeline(self):
    """创建单GPU的Pipeline,所有组件使用同一个GPU"""
    logger.info(f"[GPU {self.gpu_id}] 创建Pipeline,流数: {len(self.active_cameras)}")

    self.pipeline = Gst.Pipeline()

    # ====== 输入部分:多路RTSP源 ======
    streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
    if not streammux:
        raise RuntimeError("无法创建 nvstreammux 元素")

    # 对性能影响很重要的参数,设置为 8
    streammux.set_property('batch-size', len(self.active_cameras))

    # streammux.set_property('width', 1920)
    # streammux.set_property('height', 1080)

    # 归一化处理,适配大部分场景
    streammux.set_property('width', g_frame_width)
    streammux.set_property('height', g_frame_height)
    streammux.set_property('enable-padding', 1)  # 保持长宽比
    # 25 fps
    streammux.set_property('batched-push-timeout', 40000)

    streammux.set_property('live-source', 1)
    # 🎯 使用该Pipeline指定的GPU
    streammux.set_property('gpu-id', self.gpu_id)
    streammux.set_property('nvbuf-memory-type', 0)

    # 🔧 FIX: 防止单个源EOS导致整个pipeline结束
    try:
        streammux.set_property('drop-pipeline-eos', True)
        logger.info(f"[GPU {self.gpu_id}] nvstreammux 已启用 drop-pipeline-eos,忽略单源EOS")
    except Exception as e:
        logger.warning(f"[GPU {self.gpu_id}] 无法设置drop-pipeline-eos: {e}")

    # 🔧 FIX: 对于实时RTSP流,不同步输入以提高稳定性
    try:
        streammux.set_property('sync-inputs', 0)
        logger.info(f"[GPU {self.gpu_id}] nvstreammux 已禁用sync-inputs,提高RTSP稳定性")
    except Exception as e:
        logger.warning(f"[GPU {self.gpu_id}] 无法设置sync-inputs: {e}")

    self.pipeline.add(streammux)
    logger.info(f"[GPU {self.gpu_id}] nvstreammux 使用 GPU {self.gpu_id} (已应用EOS防护)")


    # 添加source bins
    for camera_id, camera_vo in self.active_cameras.items():
        sink_idx = self.stream_index_map[camera_id]
        src_bin = self._create_source_bin(camera_id, camera_vo.rtsp_url, self.gpu_id)
        self.pipeline.add(src_bin)
        sinkpad = streammux.request_pad_simple(f"sink_{sink_idx}")
        srcpad = src_bin.get_static_pad("src")
        if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
            raise RuntimeError(f"无法连接 {camera_id} 到 streammux")
        logger.info(f"[Pipeline] 链接流 {camera_id} 到 sink_{sink_idx}")

    # # ====== 推理部分:批处理推理 ======
    # queue1 = Gst.ElementFactory.make("queue", "queue1")
    # pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    # if not pgie:
    #     raise RuntimeError("无法创建 nvinfer 元素")
    #
    # # 🎯 使用该GPU对应的配置文件
    # yolov8_config = get_yolov8_config_path(self.gpu_id)
    # pgie.set_property('config-file-path', yolov8_config)

    # ====== 创建帧差法元素(在YOLO前进行运动检测) ======
    logger.info("=" * 80)
    logger.info("🎬 开始创建帧差法运动检测元素...")
    logger.info("=" * 80)

    # 🎯 加载本地的帧差法插件(不需要安装到系统)
    framediff_plugin_path = "/sgx/prod/eStellar/DeepStream-Yolo/frame_diff"
    framediff_so_path = os.path.join(framediff_plugin_path, "libgstframediff.so")

    logger.info(f"📂 检查帧差法插件路径: {framediff_plugin_path}")

    if not os.path.exists(framediff_plugin_path):
        logger.error(f"❌ 帧差法插件目录不存在: {framediff_plugin_path}")
        self.framediff = None
    elif not os.path.exists(framediff_so_path):
        logger.error(f"❌ 帧差法插件文件不存在: {framediff_so_path}")
        logger.error(f"   请先编译插件: cd {framediff_plugin_path} && make")
        self.framediff = None
    else:
        logger.info(f"✅ 找到帧差法插件: {framediff_so_path}")
        logger.info(f"   文件大小: {os.path.getsize(framediff_so_path) / 1024:.1f} KB")

        # 加载插件
        logger.info(f"🔧 正在加载帧差法插件...")
        registry = Gst.Registry.get()
        scan_result = registry.scan_path(framediff_plugin_path)
        logger.info(f"   扫描结果: {scan_result}")

        # 尝试创建元素
        logger.info(f"🔧 尝试创建 framediff 元素...")
        self.framediff = Gst.ElementFactory.make("framediff", "framediff")

        if not self.framediff:
            logger.error("❌ 无法创建 framediff 元素!")
            logger.error("   可能原因:")
            logger.error("   1. 插件未正确编译")
            logger.error("   2. 缺少依赖库(CUDA、DeepStream)")
            logger.error("   3. GStreamer 无法识别插件")
            logger.error(f"   编译命令: cd {framediff_plugin_path} && make")
            logger.error("   调试命令: gst-inspect-1.0 framediff")
            self.framediff = None
        else:
            logger.info("✅ framediff 元素创建成功!")

            # 🎯 配置帧差法参数(与 YOLO 同步:每 25 帧执行一次)
            logger.info("🔧 配置帧差法参数...")
            self.framediff.set_property("diff-threshold", 30)        # 像素差异阈值 (0-255)
            self.framediff.set_property("motion-threshold", 0.01)    # 运动比例阈值 (0.0-1.0)
            self.framediff.set_property("enable-debug", True)        # 🎯 启用调试输出
            self.framediff.set_property("gpu-id", self.gpu_id)       # GPU ID
            self.framediff.set_property("interval", 25)              # 🎯 每 25 帧计算一次(与 YOLO 同步)

            self.pipeline.add(self.framediff)
            logger.info("=" * 80)
            logger.info("🎉 帧差法配置完成!")
            logger.info(f"   GPU ID: {self.gpu_id}")
            logger.info(f"   推理间隔: 每 25 帧执行一次 (与 YOLO 同步)")
            logger.info(f"   像素差异阈值: 30")
            logger.info(f"   运动比例阈值: 0.01 (1%)")
            logger.info(f"   调试模式: 已启用")
            logger.info("=" * 80)

    # 创建YOLO主检测器
    print("✓ 创建YOLO主检测器...")
    self.pgie = Gst.ElementFactory.make("nvinfer", "pgie")
    if not self.pgie:
            print("✗ 无法创建YOLO检测器")
            return False



    self.pgie.set_property("config-file-path", "/sgx/prod/eStellar/agent/config/config_infer_primary_yoloV8.txt")
    self.pgie.set_property("batch-size", 6)
    self.pgie.set_property("unique-id", 1)
    self.pgie.set_property("process-mode", 1)
    self.pipeline.add(self.pgie)
    print(f"  配置: batch-size={6}, process-mode=1 (Full frame)")

    # 创建ReID副检测器
    print("✓ 创建ReID副检测器...")
    self.sgie = Gst.ElementFactory.make("nvinfer", "sgie")
    if not self.sgie:
        print("✗ 无法创建ReID检测器")
        return False


    # 🎯 使用该Pipeline指定的GPU进行推理
    self.sgie.set_property("config-file-path", "/sgx/prod/eStellar/agent/config/config_infer_reid.txt")
    self.sgie.set_property("batch-size", 6)
    self.sgie.set_property("unique-id", 2)
    self.sgie.set_property("process-mode", 2)
    self.pipeline.add(self.sgie)
    print(f"  配置: batch-size=6, process-mode=2 (Object level)")

    # 创建其他元素
    print("✓ 创建其他处理元素...")
    nvconv = Gst.ElementFactory.make("nvvideoconvert", "nvconv")
    nvosd = Gst.ElementFactory.make("nvdsosd", "nvosd")
    sink = Gst.ElementFactory.make("fakesink", "sink")

    for elem in [nvconv, nvosd, sink]:
        if not elem:
            print("✗ 无法创建元素")
            return False
        self.pipeline.add(elem)

    sink.set_property("sync", False)

    # ========== 创建 tee 分支(检测+ReID 和 图片保存) ==========
    if self.debug_image:
        print("✓ 创建 tee 分支(启用图片保存调试)...")
        tee = Gst.ElementFactory.make("tee", "tee")
        if not tee:
            print("✗ 无法创建 tee 元素")
            return False
        self.pipeline.add(tee)

        # 分支1:检测 + ReID 分支
        queue_infer = Gst.ElementFactory.make("queue", "queue_infer")
        if not queue_infer:
            print("✗ 无法创建 queue_infer")
            return False
        self.pipeline.add(queue_infer)

        # 分支2:图片保存分支
        queue_image = Gst.ElementFactory.make("queue", "queue_image")
        nvconv_image = Gst.ElementFactory.make("nvvideoconvert", "nvconv_image")
        capsfilter_image = Gst.ElementFactory.make("capsfilter", "capsfilter_image")
        sink_image = Gst.ElementFactory.make("fakesink", "sink_image")

        for elem in [queue_image, nvconv_image, capsfilter_image, sink_image]:
            if not elem:
                print("✗ 无法创建图片保存分支元素")
                return False
            self.pipeline.add(elem)

        # 设置图片分支的 caps(转换为 BGR 格式供 OpenCV 使用)
        caps_image = Gst.Caps.from_string("video/x-raw, format=BGRx")
        capsfilter_image.set_property("caps", caps_image)
        sink_image.set_property("sync", False)
    else:
        print("✓ 跳过 tee 分支(图片保存调试已禁用)...")
        tee = None
        queue_infer = None

    # 链接所有元素
    print("🔗 链接Pipeline...")

    if self.debug_image:
        # ========== 启用图片保存调试:使用 tee 分支 ==========
        # 根据是否有 framediff 插件决定链接方式
        if self.framediff:
            # 有帧差法 (CUDA 版本): streammux -> framediff -> tee
            link_success = (streammux.link(self.framediff) and
                           self.framediff.link(tee))
            print("  链路: streammux -> framediff (CUDA) -> tee")
        else:
            # 无帧差法: streammux -> tee
            link_success = streammux.link(tee)
            print("  链路: streammux -> tee")

        if not link_success:
            print("✗ Pipeline链接到tee失败")
            return False

        # 链接 tee 的分支1:检测 + ReID
        # tee -> queue_infer -> pgie -> sgie -> nvconv -> nvosd -> sink
        tee_src_pad_infer = tee.request_pad_simple("src_%u")
        queue_infer_sink_pad = queue_infer.get_static_pad("sink")
        if tee_src_pad_infer.link(queue_infer_sink_pad) != Gst.PadLinkReturn.OK:
            print("✗ 无法链接 tee -> queue_infer")
            return False

        if not (queue_infer.link(self.pgie) and
                self.pgie.link(self.sgie) and
                self.sgie.link(nvconv) and
                nvconv.link(nvosd) and
                nvosd.link(sink)):
            print("✗ 检测+ReID分支链接失败")
            return False
        print("  分支1: tee -> queue_infer -> pgie -> sgie -> nvconv -> nvosd -> sink")

        # 链接 tee 的分支2:图片保存
        # tee -> queue_image -> nvconv_image -> capsfilter_image -> sink_image
        tee_src_pad_image = tee.request_pad_simple("src_%u")
        queue_image_sink_pad = queue_image.get_static_pad("sink")
        if tee_src_pad_image.link(queue_image_sink_pad) != Gst.PadLinkReturn.OK:
            print("✗ 无法链接 tee -> queue_image")
            return False

        if not (queue_image.link(nvconv_image) and
                nvconv_image.link(capsfilter_image) and
                capsfilter_image.link(sink_image)):
            print("✗ 图片保存分支链接失败")
            return False
        print("  分支2: tee -> queue_image -> nvconv_image -> capsfilter_image -> sink_image")

        # 添加图片保存分支的 Probe
        sink_image.get_static_pad("sink").add_probe(
            Gst.PadProbeType.BUFFER, self._image_save_probe, 0)
    else:
        # ========== 禁用图片保存调试:直接链接,无 tee 分支 ==========
        # 根据是否有 framediff 插件决定链接方式
        if self.framediff:
            # 有帧差法 (CUDA 版本,直接处理 NVMM buffer): streammux -> framediff -> pgie
            link_success = (streammux.link(self.framediff) and
                           self.framediff.link(self.pgie))
            print("  链路: streammux -> framediff (CUDA) -> pgie -> sgie -> nvconv -> nvosd -> sink")
        else:
            # 无帧差法: streammux -> pgie
            link_success = streammux.link(self.pgie)
            print("  链路: streammux -> pgie -> sgie -> nvconv -> nvosd -> sink")

        if not link_success:
            print("✗ Pipeline链接失败")
            return False

        # 直接链接检测和ReID
        if not (self.pgie.link(self.sgie) and
                self.sgie.link(nvconv) and
                nvconv.link(nvosd) and
                nvosd.link(sink)):
            print("✗ 检测+ReID链接失败")
            return False

    # 添加Probe
    print("📍 添加Probe...\n")

    self.pgie.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER,
                                              self.yolo_buffer_probe, 0)
    self.sgie.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER,
                                              self.reid_buffer_probe, 0)

    # 禁用 deepstream 默认的可视化
    # pgie.set_property('process-mode', 1)  # 1=PROCESS_MODE_CLIP_OBJECTS

    # # ====== 追踪部分:对象追踪(可选) ======
    # nvtracker = None
    # g_enable_tracking = False
    # if g_enable_tracking:
    #     nvtracker = Gst.ElementFactory.make("nvtracker", "tracker")
    #     if not nvtracker:
    #         raise RuntimeError("无法创建 nvtracker 元素")
    #
    #     # 配置追踪器 - 使用更保守的设置以避免内存崩溃
    #     nvtracker.set_property('tracker-width', 320)  # 减少分辨率以降低内存使用
    #     nvtracker.set_property('tracker-height', 192)  # 减少分辨率以降低内存使用
    #     # 🎯 使用该Pipeline指定的GPU进行跟踪
    #     nvtracker.set_property('gpu-id', self.gpu_id)
    #     nvtracker.set_property('ll-lib-file',
    #                            '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
    #     logger.info(f"[GPU {self.gpu_id}] nvtracker 使用 GPU {self.gpu_id}")
    #
    #     # 使用平衡配置文件(优先保障目标检测)
    #     balanced_config_path = '/sgx/prod/estellar/agent/config/tracker_config_balanced.yml'
    #     safe_config_path = '/sgx/prod/estellar/agent/config/tracker_config_safe.yml'
    #
    #     if os.path.exists(balanced_config_path):
    #         nvtracker.set_property('ll-config-file', balanced_config_path)
    #         logger.info("使用平衡模式追踪器配置(优先保障目标检测)")
    #     elif os.path.exists(safe_config_path):
    #         nvtracker.set_property('ll-config-file', safe_config_path)
    #         logger.info("使用安全模式追踪器配置")
    #     else:
    #         nvtracker.set_property('ll-config-file', '/sgx/prod/estellar/agent/config/tracker_config.yml')
    #         logger.warning("平衡/安全配置文件不存在,使用默认配置")
    # else:
    #     logger.info("目标追踪已禁用,跳过追踪器配置")
    #
    # # ====== 分流部分:nvstreamdemux ======
    # nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "stream-demuxer")
    # if not nvstreamdemux:
    #     raise RuntimeError("无法创建 nvstreamdemux 元素")
    #
    # # 添加核心元素
    # elements = [queue1, pgie, nvstreamdemux]
    # if nvtracker:
    #     elements.insert(2, nvtracker)  # 在pgie和nvstreamdemux之间插入tracker
    # for elem in elements:
    #     self.pipeline.add(elem)
    #
    # # 链接核心pipeline
    # if not streammux.link(queue1):
    #     raise RuntimeError("无法连接 streammux -> queue1")
    # if not queue1.link(pgie):
    #     raise RuntimeError("无法连接 queue1 -> pgie")
    #
    # # 根据是否启用追踪,选择不同的链接方式
    # if nvtracker:
    #     if not pgie.link(nvtracker):
    #         raise RuntimeError("无法连接 pgie -> nvtracker")
    #     if not nvtracker.link(nvstreamdemux):
    #         raise RuntimeError("无法连接 nvtracker -> nvstreamdemux")
    #     logger.info("Pipeline链路: streammux -> queue1 -> pgie -> nvtracker -> nvstreamdemux")
    # else:
    #     if not pgie.link(nvstreamdemux):
    #         raise RuntimeError("无法连接 pgie -> nvstreamdemux")
    #     logger.info("Pipeline链路: streammux -> queue1 -> pgie -> nvstreamdemux (无追踪)")
    #
    # # ====== 输出部分:为每个流创建独立的编码推流分支 ======
    # for camera_id in self.active_cameras.keys():
    #     sink_idx = self.stream_index_map[camera_id]
    #     config = self.rtmp_configs[camera_id]
    #     self._create_output_branch(nvstreamdemux, camera_id, sink_idx, config)
    #
    # # 添加推理probe(在最后一个推理元素后获取结果)
    # if nvtracker:
    #     # 有追踪器时,在tracker之后获取追踪结果
    #     probe_pad = nvtracker.get_static_pad("src")
    #     logger.info("推理probe已绑定到nvtracker输出")
    # else:
    #     # 无追踪器时,在pgie之后获取检测结果
    #     probe_pad = pgie.get_static_pad("src")
    #     logger.info("推理probe已绑定到pgie输出(无追踪模式)")
    #
    # probe_pad.add_probe(Gst.PadProbeType.BUFFER, self._pgie_src_pad_buffer_probe, 0)
    #
    # 运行Pipeline
    print("=" * 80)
    print("🎬 运行中... (Ctrl+C停止)")
    print("=" * 80 + "\n")

    loop = GLib.MainLoop()
    bus = self.pipeline.get_bus()
    bus.add_signal_watch()

    def on_message(bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("\n✓ 到达EOS,停止Pipeline")
            loop.quit()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"\n✗ Pipeline错误: {err}")
            print(f"  详情: {debug}")
            loop.quit()
        elif t == Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            print(f"⚠️  警告: {err}")
        return True

    bus.connect("message", on_message, loop)
    self.pipeline.set_state(Gst.State.PLAYING)

    try:
        loop.run()
    except KeyboardInterrupt:
        print("\n⏹️  停止...\n")
    finally:
        self.pipeline.set_state(Gst.State.NULL)
        # self._print_summary()

    return True

def _create_source_bin(self, camera_id, rtsp_url, gpu_id=0):
    """
    创建source bin

    Args:
        camera_id: 摄像头ID
        rtsp_url: RTSP地址
        gpu_id: 分配的GPU ID
    """
    bin_name = f"source-bin-{camera_id}"
    nbin = Gst.Bin.new(bin_name)
    uri_decode_bin = Gst.ElementFactory.make("uridecodebin", None)

    if not uri_decode_bin:
        raise RuntimeError(f"无法创建 uridecodebin for {camera_id}")

    # 设置RTSP相关属性
    uri_decode_bin.set_property("uri", rtsp_url)

    # 🔧 FIX: 防止提前EOS - 禁止在segment后发送EOS信号
    try:
        uri_decode_bin.set_property("eos-after-segment", False)
        logger.info(f"[{camera_id}] 已禁用eos-after-segment,防止提前EOS")
    except Exception as e:
        logger.warning(f"[{camera_id}] 无法设置eos-after-segment: {e}")

    # 🔧 FIX: 增加缓冲区大小和时间,提高稳定性
    try:
        # uri_decode_bin.set_property("buffer-size", 524288)  # 512KB缓冲区
        uri_decode_bin.set_property("buffer-duration", 2000000000)  # 2秒缓冲时间(纳秒)
        logger.info(f"[{camera_id}] 已设置缓冲区: 512KB, 2秒")
    except Exception as e:
        logger.warning(f"[{camera_id}] 无法设置缓冲区参数: {e}")

    # 🔧 FIX: 配置连接参数,避免超时导致EOS
    try:
        uri_decode_bin.set_property("connection-speed", 0)  # 0=无限制
        logger.info(f"[{camera_id}] 已设置无限制连接速度")
    except Exception as e:
        logger.warning(f"[{camera_id}] 无法设置connection-speed: {e}")

    # 🎯 添加 nvvideoconvert 将 raw buffer 转换为 NVMM buffer
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", f"nvvidconv-src-{camera_id}")
    if not nvvidconv:
        raise RuntimeError(f"无法创建 nvvideoconvert for {camera_id}")

    # 设置 GPU ID
    nvvidconv.set_property("gpu-id", gpu_id)

    # 添加元素到 bin
    Gst.Bin.add(nbin, uri_decode_bin)
    Gst.Bin.add(nbin, nvvidconv)

    # 连接 uridecodebin -> nvvideoconvert
    # 🔧 FIX: 使用闭包捕获 camera_id,并过滤只连接视频 pad
    cam_id_copy = camera_id  # 显式捕获变量

    def on_pad_added(dbin, pad, nvconv):
        """当 uridecodebin 添加 pad 时,连接到 nvvideoconvert"""
        # 🎯 只处理视频 pad,忽略音频 pad
        caps = pad.get_current_caps()
        if caps is None:
            caps = pad.query_caps(None)

        if caps is None:
            return

        struct = caps.get_structure(0)
        if struct is None:
            return

        name = struct.get_name()
        if not name.startswith("video/"):
            # 忽略非视频 pad(如音频)
            return

        sinkpad = nvconv.get_static_pad("sink")
        if sinkpad and not sinkpad.is_linked():
            ret = pad.link(sinkpad)
            if ret == Gst.PadLinkReturn.OK:
                logger.info(f"[{cam_id_copy}] 已连接 uridecodebin -> nvvideoconvert (视频)")
            else:
                logger.warning(f"[{cam_id_copy}] 连接失败: {ret}")

    uri_decode_bin.connect("pad-added", on_pad_added, nvvidconv)

    # 创建 ghost pad 连接到 nvvideoconvert 的输出
    nvconv_srcpad = nvvidconv.get_static_pad("src")
    nbin.add_pad(Gst.GhostPad.new("src", nvconv_srcpad))

    logger.info(f"[{camera_id}] Source bin created for {rtsp_url} on GPU {gpu_id} (with nvvideoconvert)")
    return nbin

def _create_output_branch(self, nvstreamdemux, camera_id, sink_idx, config):
    """为每个流创建独立的输出分支"""
    logger.info(f"[{camera_id}] 创建输出分支 sink_{sink_idx}")

    # 创建输出分支元素
    queue_out = Gst.ElementFactory.make("queue", f"queue-out-{camera_id}")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", f"nvvidconv-{camera_id}")
    nvosd = Gst.ElementFactory.make("nvdsosd", f"nvosd-{camera_id}")

    if not all([queue_out, nvvidconv, nvosd]):
        raise RuntimeError(f"无法创建 {camera_id} 的基础输出元素")

    # 添加基础元素到pipeline
    for elem in [queue_out, nvvidconv, nvosd]:
        self.pipeline.add(elem)

    # 链接基础元素
    if not queue_out.link(nvvidconv):
        raise RuntimeError(f"无法连接 {camera_id} queue_out -> nvvidconv")
    if not nvvidconv.link(nvosd):
        raise RuntimeError(f"无法连接 {camera_id} nvvidconv -> nvosd")

    # 根据是否演示模式创建不同的输出
    if config['is_demo'] and config['push_url']:
        # 演示模式:创建RTMP推流输出
        self._create_rtmp_output(camera_id, nvosd, config['push_url'])
    else:
        # 非演示模式:使用fakesink
        self._create_fake_output(camera_id, nvosd)

    # 连接demux到分支
    demux_src_pad = nvstreamdemux.request_pad_simple(f"src_{sink_idx}")
    queue_sink_pad = queue_out.get_static_pad("sink")
    if demux_src_pad.link(queue_sink_pad) != Gst.PadLinkReturn.OK:
        raise RuntimeError(f"无法连接 demux -> {camera_id} 输出分支")

    logger.info(f"[{camera_id}] 输出分支创建完成")

def _create_rtmp_output(self, camera_id, nvosd, push_url):
    """创建RTMP推流输出"""
    # 🎯 使用该Pipeline的编码器
    if self.encoder_name is None:
        error_msg = (
            f"❌ 摄像头 {camera_id} 无法创建推流输出\n"
            f"   原因: GPU {self.gpu_id} 硬件编码器不可用\n"
            f"   ⚠️  系统拒绝启动该摄像头的推流分支"
        )
        logger.error(error_msg)
        raise RuntimeError(f"GPU {self.gpu_id} 硬件编码器不可用")

    # 使用检测到的编码器
    encoder = Gst.ElementFactory.make(self.encoder_name, f"encoder-{camera_id}")
    h264parse = Gst.ElementFactory.make("h264parse", f"h264parse-{camera_id}")
    capsfilter = Gst.ElementFactory.make("capsfilter", f"capsfilter-{camera_id}")
    queue_rtmp = Gst.ElementFactory.make("queue", f"queue-rtmp-{camera_id}")
    flvmux = Gst.ElementFactory.make("flvmux", f"flvmux-{camera_id}")
    queue_sink = Gst.ElementFactory.make("queue", f"queue-sink-{camera_id}")
    rtmpsink = Gst.ElementFactory.make("rtmpsink", f"rtmpsink-{camera_id}")

    if not all([encoder, h264parse, capsfilter, queue_rtmp, flvmux, queue_sink, rtmpsink]):
        raise RuntimeError(f"无法创建 {camera_id} 的RTMP输出元素")

    # 配置编码器
    encoder.set_property('bitrate', 4000000)
    # 🎯 设置编码器使用该Pipeline的GPU
    if self.encoder_name == "nvv4l2h264enc":
        # nvv4l2h264enc 不需要设置gpu-id,自动使用当前上下文的GPU
        pass
    elif self.encoder_name == "nvh264enc":
        # nvh264enc 需要设置gpu-id
        encoder.set_property('gpu-id', self.gpu_id)

    caps = Gst.Caps.from_string("video/x-h264,stream-format=avc,alignment=au")
    capsfilter.set_property("caps", caps)
    flvmux.set_property("streamable", True)
    rtmpsink.set_property('location', push_url)
    rtmpsink.set_property('sync', False)

    logger.info(f"[{camera_id}] 使用编码器 {self.encoder_name} on GPU {self.gpu_id}")

    # 添加到pipeline
    elements = [encoder, h264parse, capsfilter, queue_rtmp, flvmux, queue_sink, rtmpsink]
    for elem in elements:
        self.pipeline.add(elem)

    # 链接元素
    if not nvosd.link(encoder):
        raise RuntimeError(f"无法连接 {camera_id} nvosd -> encoder")
    if not encoder.link(h264parse):
        raise RuntimeError(f"无法连接 {camera_id} encoder -> h264parse")
    if not h264parse.link(capsfilter):
        raise RuntimeError(f"无法连接 {camera_id} h264parse -> capsfilter")
    if not capsfilter.link(queue_rtmp):
        raise RuntimeError(f"无法连接 {camera_id} capsfilter -> queue_rtmp")
    if not queue_rtmp.link(flvmux):
        raise RuntimeError(f"无法连接 {camera_id} queue_rtmp -> flvmux")
    if not flvmux.link(queue_sink):
        raise RuntimeError(f"无法连接 {camera_id} flvmux -> queue_sink")
    if not queue_sink.link(rtmpsink):
        raise RuntimeError(f"无法连接 {camera_id} queue_sink -> rtmpsink")

    logger.info(f"[{camera_id}] RTMP推流输出创建完成")

def _create_fake_output(self, camera_id, nvosd):
    """创建fakesink输出"""
    fakesink = Gst.ElementFactory.make("fakesink", f"fakesink-{camera_id}")
    if not fakesink:
        raise RuntimeError(f"无法创建 {camera_id} 的fakesink")

    fakesink.set_property("sync", False)
    self.pipeline.add(fakesink)

    if not nvosd.link(fakesink):
        raise RuntimeError(f"无法连接 {camera_id} nvosd -> fakesink")

    logger.info(f"[{camera_id}] fakesink输出创建完成")

def _pgie_src_pad_buffer_probe(self, pad, info, u_data):
    """
    推理结果处理probe - 批处理多流

    1. 解析帧检测结果
    2. 同步可视化(演示模式)
    3. 异步推理回调 - 添加频次控制
    4. 每1000帧打印一次状态
    5. 处理异常
    6. 返回OK
    """
    try:
        gst_buffer = info.get_buffer()
        if not gst_buffer:
            return Gst.PadProbeReturn.OK

        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list

        while l_frame is not None:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            frame_number = frame_meta.frame_num
            pad_index = frame_meta.pad_index

            # 根据pad_index找到对应的camera_id
            camera_id = None
            for cid, idx in self.stream_index_map.items():
                if idx == pad_index:
                    camera_id = cid
                    break

            if not camera_id:
                l_frame = l_frame.next
                continue

            # 解析检测结果
            detections = self._parse_frame_detections(frame_meta)
            detect_time = time.time()


            # 演示模式:同步可视化(绘制到OSD)
            camera_vo = self.active_cameras.get(camera_id)
            if camera_vo and getattr(camera_vo, 'is_demo', False):
                # 每帧都进行可视化,使用缓存的推理结果避免闪烁
                self._smooth_visualization(frame_meta, batch_meta, detections, camera_id)

            # 异步推理回调 - 使用帧号控制(更可靠)
            if self.inference_callback and detections:
                # 构造兼容的frame_meta字典
                frame_meta_dict = {
                    'frame_num': frame_meta.frame_num,
                    'pad_index': frame_meta.pad_index,
                    'source_frame_width': frame_meta.source_frame_width,
                    'source_frame_height': frame_meta.source_frame_height,
                    'resize_shape': (g_frame_height, g_frame_width),
                    'timestamp': getattr(frame_meta, 'buf_pts', 0),
                    'detect_time': detect_time
                }
                self._async_inference_callback(camera_id, detections, frame_meta_dict)

            # 每1000帧打印一次状态
            if frame_number % 1000 == 0:
                logger.info(f"[{camera_id}] Frame {frame_number}, 检测数: {len(detections)}")

            # 更新pipeline运行统计
            self._update_pipeline_stats()

            l_frame = l_frame.next

    except Exception as e:
        logger.error(f"PadProbe处理异常: {e}")
        logger.error(traceback.format_exc())

    return Gst.PadProbeReturn.OK

def _parse_frame_detections(self, frame_meta):
    """解析帧检测结果"""
    detections = []
    l_obj = frame_meta.obj_meta_list

    # 定义需要的类别ID(根据YOLOv8的COCO类别)
    VEHICLE_CLASS_IDS = {2, 3, 5, 7}  # car, motorcycle, bus, truck
    PERSON_CLASS_ID = 0  # person
    ALLOWED_CLASS_IDS = VEHICLE_CLASS_IDS | {PERSON_CLASS_ID}

    while l_obj is not None:
        try:
            obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)

            # 过滤:只保留车辆和人员
            if obj_meta.class_id not in ALLOWED_CLASS_IDS:
                l_obj = l_obj.next
                continue

            # 处理追踪ID - 兼容有无追踪器的情况
            if g_enable_tracking:
                object_id = obj_meta.object_id
                tracker_confidence = float(obj_meta.tracker_confidence) if obj_meta.tracker_confidence > 0 else None
                is_tracked = obj_meta.object_id != 0xFFFFFFFFFFFFFFFF
            else:
                # 无追踪模式:生成临时ID,无追踪置信度
                object_id = 0xFFFFFFFFFFFFFFFF  # 无效追踪ID
                tracker_confidence = None
                is_tracked = False

            detection = {
                'bbox': [
                    int(obj_meta.rect_params.left),
                    int(obj_meta.rect_params.top),
                    int(obj_meta.rect_params.left + obj_meta.rect_params.width),
                    int(obj_meta.rect_params.top + obj_meta.rect_params.height)
                ],
                'object_id': object_id,
                'class_id': int(obj_meta.class_id),
                'confidence': float(obj_meta.confidence),
                'tracker_confidence': tracker_confidence,
                'resize_shape': (g_frame_height, g_frame_width),
                'class_name': obj_meta.obj_label if obj_meta.obj_label else f"class_{obj_meta.class_id}",
                'is_tracked': is_tracked
            }
            x1, y1, x2, y2 = detection["bbox"]

            # 计算车辆判定点,y 坐标使用整个边界框的底部 + 边界框高度的 1/4,注意这里的纵坐标方向是向下的
            center_x = (x1 + x2) / 2
            center_y = ((y1 + y2) / 2) + (y2 - y1) / 4
            detection["judge_point"] = (int(center_x), int(center_y))
            detections.append(detection)

        except Exception as e:
            logger.error(f"解析检测对象失败: {e}")

        try:
            l_obj = l_obj.next
        except:
            break

    return detections


def _parse_frame_reid_yolo(self, frame_mata):

    detection = {
        'bbox': [
            int(obj_meta.rect_params.left),
            int(obj_meta.rect_params.top),
            int(obj_meta.rect_params.left + obj_meta.rect_params.width),
            int(obj_meta.rect_params.top + obj_meta.rect_params.height)
        ],
        'object_id': object_id,
        'class_id': int(obj_meta.class_id),
        'confidence': float(obj_meta.confidence),
        'tracker_confidence': tracker_confidence,
        'resize_shape': (g_frame_height, g_frame_width),
        'class_name': obj_meta.obj_label if obj_meta.obj_label else f"class_{obj_meta.class_id}",
        'is_tracked': is_tracked
    }

def _smooth_visualization(self, frame_meta, batch_meta, detections, camera_id):
    """平滑可视化(演示模式)- 每帧基于缓存数据绘制,避免闪烁"""
    try:
        # 获取摄像机配置
        from core.rule.config_manager import ConfigManager
        from core.cache.state_cache import get_state_cache

        config_manager = ConfigManager.get_instance()
        camera_config = config_manager.get_camera(camera_id)
        if not camera_config:
            return

        state_cache = get_state_cache()
        frame_width = g_frame_width
        frame_height = g_frame_height

        # 场景1:车位检测场景
        if camera_config.park_check_tag == 1 and hasattr(camera_config, 'spots') and camera_config.spots:
            self._draw_parking_spots(frame_meta, batch_meta, camera_id, camera_config, state_cache, frame_width,
                                     frame_height)

        # 场景2:排队监测场景
        if camera_config.queue_check_tag == 1:
            self._draw_queue_monitoring(frame_meta, batch_meta, camera_id, camera_config, state_cache, detections,
                                        frame_width, frame_height)

        # 场景3:人员聚集检测场景 - 使用平滑绘制
        if camera_config.crowd_check_tag == 1:
            self._draw_crowd_monitoring_smooth(frame_meta, batch_meta, camera_id, camera_config, state_cache,
                                               detections, frame_width, frame_height)

    except Exception as e:
        logger.error(f"平滑可视化失败: {e}")
        logger.error(traceback.format_exc())

# 每个摄像头的上次更新时间缓存 {camera_id: last_update_time}
_camera_last_update_time: Dict[int, float] = {}

# 每个摄像头的上次图片保存时间缓存 {camera_id: last_save_time}
_camera_last_save_time: Dict[int, float] = {}

def _smooth_car_visualization(self, frame_meta, batch_meta, camera_id):
    """
    平滑可视化(演示模式)- 每帧基于缓存数据绘制车辆信息

    逻辑:
    1. 获取该摄像头上一次的更新时间
    2. 找到离这个时间最近的所有车辆(基于 last_update_time)
    3. 绘制车辆边界框、车牌号
    """
    try:
        # 获取摄像机配置
        from core.rule.config_manager import ConfigManager
        from core.cache.vehicle_track_cache import VehicleTrackCacheManager

        config_manager = ConfigManager.get_instance()
        camera_config = config_manager.get_camera(camera_id)
        if not camera_config:
            return

        frame_width = g_frame_width
        frame_height = g_frame_height

        # ========== 从内存缓存获取该摄像头的所有车辆 ==========
        # 🎯 使用 car_latest=1 的数据(包含 feature,用于 RTMP 绘制)
        track_cache_mgr = VehicleTrackCacheManager.get_instance()
        cached_vehicles = track_cache_mgr.search_by_camera(camera_id, car_latest=1)

        if not cached_vehicles:
            return

        # ========== 获取该摄像头上一次的更新时间 ==========
        prev_update_time = self._camera_last_update_time.get(camera_id, 0)

        # ========== 找到所有车辆中最新的 last_update_time ==========
        latest_time = 0
        for v in cached_vehicles:
            update_time = v.get('last_update_time') or 0
            if update_time > latest_time:
                latest_time = update_time

        if latest_time == 0:
            return

        # ========== 更新该摄像头的上次更新时间 ==========
        self._camera_last_update_time[camera_id] = latest_time

        # ========== 获取离上次更新时间最近的所有车辆 ==========
        # 获取 last_update_time >= prev_update_time 的车辆(允许 0.1 秒误差)
        if prev_update_time == 0:
            # 第一次:获取最新时间的所有车辆
            valid_vehicles = [v for v in cached_vehicles
                            if abs((v.get('last_update_time') or 0) - latest_time) <= 0.1]
        else:
            # 获取 >= 上次时间的所有车辆(新更新的)
            valid_vehicles = [v for v in cached_vehicles
                            if (v.get('last_update_time') or 0) >= prev_update_time - 0.1]

        # ========== 绘制每辆车的信息 ==========
        for vehicle in valid_vehicles:
            bbox = vehicle.get('bbox') or vehicle.get('last_position')
            if not bbox or len(bbox) < 4:
                continue

            # 获取车牌号和位置信息
            plate_no = vehicle.get('vehicle_plate_no', '')
            g_track_id = vehicle.get('g_track_id', 0)

            # 绘制边界框(青色)
            self._draw_rectangle_osd(frame_meta, batch_meta, bbox, 0.0, 1.0, 1.0)

            # 绘制判定点(红色圆点)
            cx = (bbox[0] + bbox[2]) // 2
            cy = (bbox[1] + bbox[3]) // 2 + (bbox[3] - bbox[1]) // 4
            self._draw_circle_osd(frame_meta, batch_meta, cx, cy, 5, 1.0, 0.0, 0.0)

            # 构建标签文本:车牌号
            if plate_no:
                label = f"{plate_no}"
            else:
                label = f"ID:{g_track_id}"

            # 绘制标签(在边界框上方)
            self._draw_label_osd(frame_meta, batch_meta, bbox[0], bbox[1] - 25, label)

    except Exception as e:
        logger.error(f"车辆可视化失败: {e}")
        logger.error(traceback.format_exc())

def _image_save_probe(self, pad, info, u_data):
    """
    图片保存分支的 Probe
    从 tee 分支获取帧,检查是否有待保存的图片任务
    """
    try:
        gst_buffer = info.get_buffer()
        if not gst_buffer:
            return Gst.PadProbeReturn.OK

        # 获取 batch metadata
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        if not batch_meta:
            return Gst.PadProbeReturn.OK

        # 遍历每一帧
        l_frame = batch_meta.frame_meta_list
        while l_frame is not None:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                camera_id = frame_meta.source_id

                # ✅ 检查是否有图片保存标志
                should_save = False
                with self.image_save_lock:
                    if camera_id in self.image_save_flags:
                        should_save = True
                        # 清除标志(避免重复保存)
                        del self.image_save_flags[camera_id]
                        logger.debug(f"[ImageSave] 检测到保存标志: camera_id={camera_id}")

                # 只有在设置了标志时才保存图片
                if not should_save:
                    l_frame = l_frame.next
                    continue

                # 获取帧数据(转换为 numpy 数组)
                n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
                if n_frame is None:
                    logger.warning(f"[ImageSave] 无法获取帧数据: camera_id={camera_id}")
                    l_frame = l_frame.next
                    continue

                # 转换为 numpy 数组(BGRx 格式)
                frame_image = np.array(n_frame, copy=True, order='C')

                # 转换为 BGR 格式(OpenCV 使用)
                frame_bgr = cv2.cvtColor(frame_image, cv2.COLOR_RGBA2BGR)

                # ✅ 调用图片绘制和保存方法(直接从缓存读取最新数据)
                self._smooth_car_visualization_image(frame_bgr, camera_id)

                l_frame = l_frame.next
            except StopIteration:
                break
            except Exception as e:
                logger.error(f"[ImageSave] 处理帧异常: {e}")
                logger.error(traceback.format_exc())
                try:
                    l_frame = l_frame.next
                except:
                    break

        return Gst.PadProbeReturn.OK

    except Exception as e:
        logger.error(f"[ImageSave] Probe异常: {e}")
        logger.error(traceback.format_exc())
        return Gst.PadProbeReturn.OK

def _smooth_car_visualization_image(self, frame_bgr, camera_id):
    """
    图片保存方法 - 用 OpenCV 绘制并保存每辆车的图片

    逻辑:
    1. 从内存缓存获取该摄像头最新更新的车辆
    2. 为每辆车单独绘制一张图片(带边界框、车牌号、时间戳)
    3. 保存到指定目录
    """
    try:
        import cv2
        import os
        from datetime import datetime
        from core.cache.vehicle_track_cache import VehicleTrackCacheManager

        # ========== 从内存缓存获取该摄像头的所有车辆 ==========
        # 🎯 使用 car_latest=1 的数据(包含 feature,用于图片保存)
        track_cache_mgr = VehicleTrackCacheManager.get_instance()
        cached_vehicles = track_cache_mgr.search_by_camera(camera_id, car_latest=1)

        if not cached_vehicles:
            return

        # ========== 获取该摄像头上一次的保存时间 ==========
        prev_save_time = self._camera_last_save_time.get(camera_id, 0)

        # ========== 找到所有车辆中最新的 last_update_time ==========
        latest_time = 0
        for v in cached_vehicles:
            update_time = v.get('last_update_time') or 0
            if update_time > latest_time:
                latest_time = update_time

        if latest_time == 0:
            return

        # ========== 更新该摄像头的上次保存时间 ==========
        self._camera_last_save_time[camera_id] = latest_time

        # ========== 获取离上次保存时间最近的所有车辆 ==========
        if prev_save_time == 0:
            # 第一次:获取最新时间的所有车辆
            valid_vehicles = [v for v in cached_vehicles
                            if abs((v.get('last_update_time') or 0) - latest_time) <= 0.1]
        else:
            # 获取 >= 上次时间的所有车辆(新更新的)
            valid_vehicles = [v for v in cached_vehicles
                            if (v.get('last_update_time') or 0) >= prev_save_time - 0.1]

        if not valid_vehicles:
            return

        # ========== 创建保存目录(按年/月/日结构) ==========
        now = datetime.now()
        year = now.strftime("%Y")
        month = now.strftime("%m")
        day = now.strftime("%d")

        # agent/car_detail/2026/01/27/
        base_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "car_detail")
        save_dir = os.path.join(base_dir, year, month, day)
        os.makedirs(save_dir, exist_ok=True)

        # ========== 为每辆车绘制并保存一张图片 ==========
        for idx, vehicle in enumerate(valid_vehicles):
            bbox = vehicle.get('bbox') or vehicle.get('last_position')
            if not bbox or len(bbox) < 4:
                continue

            # 复制原始帧(每辆车一张独立的图片)
            frame_copy = frame_bgr.copy()

            # 获取车辆信息
            plate_no = vehicle.get('vehicle_plate_no', '')
            g_track_id = vehicle.get('g_track_id', 0)

            # 绘制边界框(青色)
            x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
            cv2.rectangle(frame_copy, (x1, y1), (x2, y2), (255, 255, 0), 2)  # BGR: 青色

            # 绘制判定点(红色圆点)
            cx = (x1 + x2) // 2
            cy = (y1 + y2) // 2 + (y2 - y1) // 4
            cv2.circle(frame_copy, (cx, cy), 5, (0, 0, 255), -1)  # BGR: 红色

            # 构建标签文本:车牌号
            if plate_no:
                label = f"{plate_no}"
            else:
                label = f"ID:{g_track_id}"

            # 绘制标签背景(黑色半透明)
            label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
            cv2.rectangle(frame_copy, (x1, y1 - 25), (x1 + label_size[0], y1), (0, 0, 0), -1)

            # 绘制标签文字(白色)
            cv2.putText(frame_copy, label, (x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            # 绘制时间戳(左上角)
            timestamp_text = now.strftime("%Y-%m-%d %H:%M:%S")
            cv2.putText(frame_copy, timestamp_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

            # ========== 构建文件名:时间_车牌号.jpg ==========
            time_str = now.strftime("%Y%m%d%H%M%S")  # 20260127173045

            if plate_no:
                filename = f"{time_str}_{plate_no}.jpg"
            else:
                filename = f"{time_str}_ID{g_track_id}.jpg"

            filepath = os.path.join(save_dir, filename)

            # 保存图片
            cv2.imwrite(filepath, frame_copy)
            logger.info(f"[ImageSave] 保存车辆图片: {filepath}")

    except Exception as e:
        logger.error(f"[ImageSave] 图片保存失败: camera_id={camera_id}, error={e}")
        logger.error(traceback.format_exc())

# 本方法已经废弃
def _sync_visualization(self, frame_meta, batch_meta, detections, camera_id):
    """
    同步可视化(演示模式)- 绘制到OSD(已弃用,使用 _smooth_visualization)

    本方法已经废弃
    """
    try:
        # 获取摄像头配置
        from core.rule.config_manager import ConfigManager
        from core.cache.state_cache import get_state_cache

        config_manager = ConfigManager.get_instance()
        state_cache = get_state_cache()
        camera_config = config_manager.get_camera(camera_id)

        if not camera_config:
            logger.warning(f"未找到摄像头 {camera_id} 的配置")
            return

        # 获取帧尺寸
        frame_width = g_frame_width
        frame_height = g_frame_height

        # 场景1:车位检测场景
        if camera_config.park_check_tag == 1 and hasattr(camera_config, 'spots') and camera_config.spots:
            self._draw_parking_spots(frame_meta, batch_meta, camera_id, camera_config, state_cache, frame_width,
                                     frame_height)

        # 场景2:排队监测场景
        if camera_config.queue_check_tag == 1:
            self._draw_queue_monitoring(frame_meta, batch_meta, camera_id, camera_config, state_cache, detections,
                                        frame_width, frame_height)

        # 场景3:人员聚集检测场景
        if camera_config.crowd_check_tag == 1:
            self._draw_crowd_monitoring(frame_meta, batch_meta, camera_id, camera_config, state_cache, detections,
                                        frame_width, frame_height)

        # 绘制过滤后的检测结果(只显示车和人)
        # self._draw_filtered_detections(frame_meta, batch_meta, detections)

    except Exception as e:
        logger.error(f"同步可视化失败: {e}")
        logger.error(traceback.format_exc())

def _draw_parking_spots(self):
    print("绘制车辆的计算")
def _draw_parking_spots(self, frame_meta, batch_meta, camera_id, camera_config, state_cache, frame_width,
                        frame_height):
    """绘制停车位"""
    try:
        parking_spot_states = state_cache.get_spot_states_by_camera_id(camera_id)

        for spot_id, spot_config in camera_config.spots.items():
            if hasattr(spot_config, 'coordinates') and spot_config.coordinates:
                # 转换百分比坐标为像素坐标
                pixel_coords = []
                for point in spot_config.coordinates:
                    if isinstance(point, dict) and 'x' in point and 'y' in point:
                        pixel_x = int((point['x'] / 100) * frame_width)
                        pixel_y = int((point['y'] / 100) * frame_height)
                        pixel_coords.append([pixel_x, pixel_y])
                    elif isinstance(point, list) and len(point) >= 2:
                        pixel_x = int((point[0] / 100) * frame_width)
                        pixel_y = int((point[1] / 100) * frame_height)
                        pixel_coords.append([pixel_x, pixel_y])

                if len(pixel_coords) >= 3:  # 至少需要3个点构成多边形
                    # 获取车位状态
                    current_spot_status = "vacant"  # 默认空闲
                    spot_state = parking_spot_states.get(spot_id)
                    if spot_state and 'current_status' in spot_state:
                        current_spot_status = spot_state['current_status']

                    # 设置颜色:占用为红色,空闲为绿色
                    if current_spot_status == "occupied":
                        color_r, color_g, color_b = 1.0, 0.0, 0.0  # 红色
                    else:
                        color_r, color_g, color_b = 0.0, 1.0, 0.0  # 绿色

                    # 使用DeepStream的多边形绘制
                    self._draw_polygon_osd(frame_meta, batch_meta, pixel_coords, color_r, color_g, color_b,
                                           alpha=0.5)

    except Exception as e:
        logger.error(f"绘制停车位失败: {e}")

def _draw_queue_monitoring(self, frame_meta, batch_meta, camera_id, camera_config, state_cache, detections,
                           frame_width, frame_height):
    """绘制排队监测信息"""
    try:
        # 在左下角显示排队状态信息
        status_lines = []

        if hasattr(camera_config, 'queue_areas') and camera_config.queue_areas:
            for area_id, area_config in camera_config.queue_areas.items():
                # 获取排队区域状态
                area_state = state_cache.get_queue_area_state(area_id)
                vehicle_count = 0
                if area_state and 'count' in area_state:
                    vehicle_count = area_state['count']

                # 确定拥堵状态
                if vehicle_count <= 5:
                    status = "畅通"
                elif vehicle_count <= 10:
                    status = "轻度拥堵"
                else:
                    status = "严重拥堵"

                status_lines.append(f"{area_id}: {vehicle_count}辆 ({status})")

        # 如果没有配置排队区域,显示总检测车辆数
        if not status_lines:
            total_vehicles = len(detections)
            status_lines.append(f"检测车辆: {total_vehicles}辆")

        # 在左下角绘制状态文本
        self._draw_text_osd(frame_meta, batch_meta, status_lines, frame_width, frame_height)

    except Exception as e:
        logger.error(f"绘制排队监测信息失败: {e}")

def _draw_vehicle_detections(self, frame_meta, batch_meta, detections):
    """绘制车辆检测结果"""

    try:
        for detection in detections:
            bbox = detection['bbox']
            confidence = detection.get('confidence', 0.0)
            class_id = detection.get('class_id', 0)

            # 绘制边界框
            # self._draw_rectangle_osd(frame_meta, batch_meta, bbox, 0.0, 1.0, 1.0)  # 青色边界框

            # 绘制判定点
            cx = (bbox[0] + bbox[2]) // 2
            cy = (bbox[1] + bbox[3]) // 2 + (bbox[3] - bbox[1]) // 4
            self._draw_circle_osd(frame_meta, batch_meta, cx, cy, 5, 1.0, 0.0, 0.0)  # 红色判定点

            # 绘制标签(包含追踪ID)
            object_id = detection.get('object_id', 0)
            is_tracked = detection.get('is_tracked', False)
            if is_tracked:
                label = f"ID:{object_id} {class_id} {confidence:.2f}"
            else:
                label = f"{class_id} {confidence:.2f}"
            # label = f"{class_id}   {confidence:.2f}"
            self._draw_label_osd(frame_meta, batch_meta, bbox[0], bbox[1] - 10, label)

    except Exception as e:
        logger.error(f"绘制车辆检测结果失败: {e}")

def _draw_polygon_osd(self, frame_meta, batch_meta, pixel_coords, color_r, color_g, color_b, alpha=0.5):
    """使用DeepStream OSD绘制多边形"""
    try:
        display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)

        # 绘制多边形边缘线条
        if len(pixel_coords) >= 2:
            for i in range(len(pixel_coords)):
                next_i = (i + 1) % len(pixel_coords)
                line_params = display_meta.line_params[display_meta.num_lines]
                line_params.x1 = pixel_coords[i][0]
                line_params.y1 = pixel_coords[i][1]
                line_params.x2 = pixel_coords[next_i][0]
                line_params.y2 = pixel_coords[next_i][1]
                line_params.line_width = 5
                line_params.line_color.red = color_r
                line_params.line_color.green = color_g
                line_params.line_color.blue = color_b
                line_params.line_color.alpha = alpha
                display_meta.num_lines += 1

                if display_meta.num_lines >= 16:  # DeepStream限制
                    break

        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

    except Exception as e:
        logger.error(f"绘制多边形失败: {e}")

def _draw_rectangle_osd(self, frame_meta, batch_meta, bbox, color_r, color_g, color_b):
    """使用DeepStream OSD绘制矩形"""
    try:
        display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        rect_params = display_meta.rect_params[0]
        rect_params.left = bbox[0]
        rect_params.top = bbox[1]
        rect_params.width = bbox[2] - bbox[0]
        rect_params.height = bbox[3] - bbox[1]
        rect_params.border_width = 2
        rect_params.border_color.red = color_r
        rect_params.border_color.green = color_g
        rect_params.border_color.blue = color_b
        rect_params.border_color.alpha = 0.5
        rect_params.has_bg_color = 0
        display_meta.num_rects = 1
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

    except Exception as e:
        logger.error(f"绘制矩形失败: {e}")

def _draw_circle_osd(self, frame_meta, batch_meta, cx, cy, radius, color_r, color_g, color_b):
    """使用DeepStream OSD绘制圆形"""
    try:
        display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        circle_params = display_meta.circle_params[0]
        circle_params.xc = cx
        circle_params.yc = cy
        circle_params.radius = radius
        circle_params.circle_color.red = color_r
        circle_params.circle_color.green = color_g
        circle_params.circle_color.blue = color_b
        circle_params.circle_color.alpha = 0.5
        circle_params.has_bg_color = 0
        display_meta.num_circles = 1
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

    except Exception as e:
        logger.error(f"绘制圆形失败: {e}")

def _draw_label_osd(self, frame_meta, batch_meta, x, y, text):
    """使用DeepStream OSD绘制文本标签"""
    try:
        display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        text_params = display_meta.text_params[0]
        text_params.display_text = text
        text_params.x_offset = x
        text_params.y_offset = y
        text_params.font_params.font_name = "Serif"
        text_params.font_params.font_size = 12
        text_params.font_params.font_color.red = 1.0
        text_params.font_params.font_color.green = 1.0
        text_params.font_params.font_color.blue = 1.0
        text_params.font_params.font_color.alpha = 1.0
        text_params.set_bg_clr = 1
        text_params.text_bg_clr.red = 0.0
        text_params.text_bg_clr.green = 0.0
        text_params.text_bg_clr.blue = 0.0
        text_params.text_bg_clr.alpha = 0.5
        display_meta.num_labels = 1
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

    except Exception as e:
        logger.error(f"绘制文本标签失败: {e}")

def _draw_text_osd(self, frame_meta, batch_meta, text_lines, frame_width, frame_height):
    """在左下角绘制多行状态文本"""
    try:
        # 计算左下角位置
        start_x = 20
        start_y = frame_height - 20 - (len(text_lines) * 25)  # 每行25像素高度

        for i, text in enumerate(text_lines):
            y_pos = start_y + (i * 25)
            self._draw_label_osd(frame_meta, batch_meta, start_x, y_pos, text)

    except Exception as e:
        logger.error(f"绘制状态文本失败: {e}")

def _draw_crowd_monitoring_smooth(self, frame_meta, batch_meta, camera_id, camera_config,
                                  state_cache, detections, frame_width, frame_height):
    """
    平滑绘制人员聚集监测可视化 - 基于缓存数据,每帧绘制,避免闪烁

    可视化效果:
    1. 默认人员:红色边界框
    2. 聚集状态人员:蓝色边界框
    3. 检测到聚集(未确认):紫色边界框
    4. 确认聚集状态:绿色边界框
    5. 聚集区域信息:显示人数和状态
    """
    try:
        # 1. 过滤人员检测结果
        person_detections = [d for d in detections if d.get('class_name') == 'person']

        # 2. 获取聚集可视化数据(基于缓存,不依赖当前帧的检测结果)
        crowd_vis_data = {}
        if hasattr(camera_config, 'crowd_areas') and camera_config.crowd_areas:
            for area_id in camera_config.crowd_areas.keys():
                cache_key = f"crowd_vis_{camera_id}_{area_id}"
                vis_data = state_cache.get_state(cache_key)
                if vis_data:
                    # 检查数据是否过期(超过5秒的数据不显示)
                    current_time = time.time()
                    data_time = vis_data.get('timestamp', 0)
                    if current_time - data_time <= 5.0:  # 5秒内的数据有效
                        crowd_vis_data[area_id] = vis_data

        # 3. 绘制人员边界框 - 补充缺失的蓝色标注逻辑
        for person in person_detections:
            # 判断人员是否在聚集区域内
            is_in_crowd = self._is_person_in_crowd_region(person, crowd_vis_data)

            if is_in_crowd:
                # 聚集状态:蓝色边界框
                self._draw_rectangle_osd(frame_meta, batch_meta, person['bbox'], 0.0, 0.0, 1.0)
            else:
                # 正常状态:红色边界框
                self._draw_rectangle_osd(frame_meta, batch_meta, person['bbox'], 1.0, 0.0, 0.0)

        # 4. 绘制聚集区域边界和信息
        for area_id, vis_data in crowd_vis_data.items():
            if vis_data.get('crowd_region_bbox'):
                crowd_bbox = vis_data['crowd_region_bbox']
                person_count = vis_data.get('person_count', 0)
                is_confirmed = vis_data.get('confirmed_crowded', False)  # 是否已确认聚集
                max_density = vis_data.get('max_density', 0.0)

                if is_confirmed:
                    # 确认聚集状态:绿色边界框
                    self._draw_rectangle_osd(frame_meta, batch_meta, crowd_bbox, 0.0, 1.0, 0.0)
                    label = f"crowed: {person_count} density:{max_density:.1f}"
                else:
                    # 检测到聚集但未确认:紫色边界框
                    self._draw_rectangle_osd(frame_meta, batch_meta, crowd_bbox, 0.5, 0.0, 1.0)
                    label = f"crow: {person_count} density:{max_density:.1f}"

                # 在聚集区域上方显示人数信息
                self._draw_label_osd(frame_meta, batch_meta, crowd_bbox[0], crowd_bbox[1] - 25, label)

    except Exception as e:
        logger.error(f"平滑绘制人员聚集监测失败: {e}")

# 废弃
def _draw_crowd_monitoring(self, frame_meta, batch_meta, camera_id, camera_config,
                           state_cache, detections, frame_width, frame_height):
    """
    绘制人员聚集监测可视化(已弃用,使用 _draw_crowd_monitoring_smooth )

    可视化效果:
    1. 默认人员:红色边界框
    2. 聚集状态人员:蓝色边界框
    3. 检测到聚集(未确认):紫色边界框
    4. 确认聚集状态:绿色边界框
    """
    try:
        # 1. 过滤人员检测结果
        person_detections = [d for d in detections if d.get('class_name') == 'person']

        # 2. 获取聚集可视化数据
        crowd_vis_data = {}
        if hasattr(camera_config, 'crowd_areas') and camera_config.crowd_areas:
            for area_id in camera_config.crowd_areas.keys():
                cache_key = f"crowd_vis_{camera_id}_{area_id}"
                vis_data = state_cache.get_state(cache_key)
                if vis_data:
                    crowd_vis_data[area_id] = vis_data

        # 3. 绘制人员边界框
        for person in person_detections:
            # 判断人员是否在聚集区域内
            is_in_crowd = self._is_person_in_crowd_region(person, crowd_vis_data)

            if is_in_crowd:
                # 聚集状态:蓝色边界框
                self._draw_rectangle_osd(frame_meta, batch_meta, person['bbox'], 0.0, 0.0, 1.0)
            else:
                # 正常状态:红色边界框
                self._draw_rectangle_osd(frame_meta, batch_meta, person['bbox'], 1.0, 0.0, 0.0)

        # 4. 绘制聚集区域边界
        for area_id, vis_data in crowd_vis_data.items():
            if vis_data.get('crowd_region_bbox'):
                crowd_bbox = vis_data['crowd_region_bbox']
                person_count = vis_data.get('person_count', 0)
                is_confirmed = vis_data.get('confirmed_crowded', False)  # 是否已确认聚集

                if is_confirmed:
                    # 确认聚集状态:绿色边界框
                    self._draw_rectangle_osd(frame_meta, batch_meta, crowd_bbox, 0.0, 1.0, 0.0)
                    label = f"确认聚集: {person_count}人"
                else:
                    # 检测到聚集但未确认:紫色边界框
                    self._draw_rectangle_osd(frame_meta, batch_meta, crowd_bbox, 0.5, 0.0, 1.0)
                    label = f"检测聚集: {person_count}人"

                # 在聚集区域上方显示人数信息
                self._draw_label_osd(frame_meta, batch_meta, crowd_bbox[0], crowd_bbox[1] - 25, label)

    except Exception as e:
        logger.error(f"绘制人员聚集监测失败: {e}")

def _is_person_in_crowd_region(self, person: Dict, crowd_vis_data: Dict) -> bool:
    """判断人员是否在聚集区域内"""
    try:
        person_bbox = person['bbox']
        person_center_x = (person_bbox[0] + person_bbox[2]) / 2
        person_center_y = (person_bbox[1] + person_bbox[3]) / 2

        for area_id, vis_data in crowd_vis_data.items():
            if vis_data.get('crowd_detected') and vis_data.get('crowd_region_bbox'):
                crowd_bbox = vis_data['crowd_region_bbox']

                # 检查人员中心点是否在聚集区域内
                if (crowd_bbox[0] <= person_center_x <= crowd_bbox[2] and
                        crowd_bbox[1] <= person_center_y <= crowd_bbox[3]):
                    return True

        return False
    except Exception as e:
        logger.error(f"判断人员是否在聚集区域失败: {e}")
        return False

def _async_inference_callback(self, camera_id, detections, frame_meta_dict):
    """异步执行推理回调"""
    try:
        if self.inference_callback and self.running and self.thread_pool:
            # 检查线程池是否已关闭
            if not self.thread_pool._shutdown:
                self.thread_pool.submit(
                    self._safe_inference_callback,
                    camera_id, detections, frame_meta_dict
                )
            else:
                logger.debug(f"线程池已关闭,跳过推理回调: {camera_id}")
    except RuntimeError as e:
        if "cannot schedule new futures after interpreter shutdown" in str(e):
            logger.debug("解释器正在关闭,跳过推理回调")
            # 设置标志防止后续提交
            self.running = False
        else:
            logger.error(f"提交异步推理回调失败: {e}")
    except Exception as e:
        logger.error(f"提交异步推理回调失败: {e}")

def _safe_inference_callback(self, camera_id, detections, frame_meta_dict):
    """安全的推理回调执行"""
    try:
        self.inference_callback(camera_id, detections, frame_meta_dict)
    except Exception as e:
        logger.error(f"推理回调执行失败: {e}")
        logger.error(f"回调异常详情: {traceback.format_exc()}")

def _bus_call(self, bus, message, loop):
    """处理bus消息"""
    t = message.type
    if t == Gst.MessageType.EOS:
        logger.info("Pipeline End-of-stream")
        loop.quit()
    elif t == Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        logger.warning(f"Pipeline Warning: {err}: {debug}")
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        error_message = str(err)
        logger.error(f"Pipeline Error: {err}: {debug}")

        # 分析错误类型,决定是否需要退出
        should_exit = self._analyze_error_severity(error_message, debug)

        if should_exit:
            # 记录严重错误到日志,便于Docker看门狗检测
            logger.error("Pipeline Error 严重错误,Agent即将退出 - Docker将重启容器")
            logger.error(f"错误详情: {error_message}")
            logger.error(f"调试信息: {debug}")
            sys.exit(1)  # 直接退出,由Docker重启机制处理
        else:
            logger.warning("Pipeline Error 非严重错误,继续运行")
            # 可以在这里添加错误恢复逻辑
            self._handle_recoverable_error(error_message, debug)
    elif t == Gst.MessageType.ELEMENT:
        # 🎯 处理帧差法运动检测消息
        struct = message.get_structure()
        if struct and struct.get_name() == "framediff-motion":
            self._handle_framediff_motion(struct)
    return True

def _handle_framediff_motion(self, struct):
    """
    处理帧差法运动检测消息

    消息结构:
    - frame-number: 帧编号
    - motion-pixels: 运动像素数
    - total-pixels: 总像素数
    - motion-ratio: 运动比例
    - has-motion: 是否检测到运动
    """
    try:
        frame_number = struct.get_uint64("frame-number")[1]
        motion_pixels = struct.get_uint64("motion-pixels")[1]
        total_pixels = struct.get_uint64("total-pixels")[1]
        motion_ratio = struct.get_double("motion-ratio")[1]
        has_motion = struct.get_boolean("has-motion")[1]

        # 调试输出(每 30 帧输出一次)
        if frame_number % 30 == 0:
            logger.info(
                f"[FrameDiff] Frame#{frame_number}: "
                f"{motion_pixels}/{total_pixels} ({motion_ratio*100:.2f}%) "
                f"{'🟢 MOTION' if has_motion else '⚪ STILL'}"
            )

        # 可以在这里添加运动检测的业务逻辑
        # 例如:只在检测到运动时才进行 YOLO 推理
        # 或者:记录运动事件到数据库

    except Exception as e:
        logger.warning(f"处理帧差法消息失败: {e}")

def _analyze_error_severity(self, error_message: str, debug_info: str) -> bool:
    """
    分析错误严重程度,决定是否需要退出程序 (适配Docker环境)

    Args:
        error_message: 错误消息
        debug_info: 调试信息

    Returns:
        bool: True 表示需要退出,False 表示可以继续运行
    """
    # 将错误消息转换为小写以便匹配
    error_lower = error_message.lower()
    debug_lower = debug_info.lower() if debug_info else ""

    # 定义非严重错误模式(这些错误不应导致整个程序退出)
    recoverable_errors = [
        "could not open resource for reading and writing",  # RTSP 连接失败
        "timeout while waiting for server response",  # RTSP 超时
        "failed to connect",  # 连接失败
        "connection refused",  # 连接被拒绝
        "network unreachable",  # 网络不可达
        "host unreachable",  # 主机不可达
        "rtsp source error",  # RTSP 源错误
        "gst_rtspsrc_retrieve_sdp",  # RTSP SDP 获取失败
        "resource busy",  # 资源忙
        "buffering",  # 缓冲问题
    ]

    # 检查是否为可恢复错误
    for pattern in recoverable_errors:
        if pattern in error_lower or pattern in debug_lower:
            logger.info(f"检测到可恢复错误模式: {pattern}")
            return False

    # 定义严重错误模式(这些错误需要退出程序,触发Docker重启)
    critical_errors = [
        "out of memory",  # 内存不足
        "cuda error",  # CUDA 错误
        "gpu error",  # GPU 错误
        "inference engine error",  # 推理引擎错误
        "pipeline creation failed",  # Pipeline 创建失败
        "element linking failed",  # 元素链接失败
        "segmentation fault",  # 段错误
        "core dumped",  # 核心转储
        "nvstreammux",  # StreamMux错误
        "nvinfer",  # 推理错误
        "nvtracker",  # 追踪器错误
        "gstreamer-critical",  # GStreamer严重错误
        "tensorrt",  # TensorRT错误
        "deepstream",  # DeepStream错误
    ]

    # 检查是否为严重错误
    for pattern in critical_errors:
        if pattern in error_lower or pattern in debug_lower:
            logger.error(f"检测到严重错误模式: {pattern} - 将触发容器重启")
            return True

    # 连续错误检测:如果短时间内出现大量错误,也认为是严重问题
    self._track_error_frequency(error_message)
    if self._is_error_frequency_critical():
        logger.error("错误频率过高,认定为严重故障 - 将触发容器重启")
        return True

    # 默认情况:如果无法确定错误类型,暂时不退出,记录警告
    logger.warning(f"未知错误类型,暂不退出: {error_message}")
    return False

def _track_error_frequency(self, error_message: str):
    """
    跟踪错误发生频率 (Docker环境故障检测)

    Args:
        error_message: 错误消息
    """
    try:
        current_time = time.time()
        self.error_timestamps.append(current_time)

        # 清理过期的错误记录(超过时间窗口的)
        cutoff_time = current_time - self.error_frequency_window
        self.error_timestamps = [ts for ts in self.error_timestamps if ts >= cutoff_time]

    except Exception as e:
        logger.error(f"跟踪错误频率时出现异常: {e}")

def _is_error_frequency_critical(self) -> bool:
    """
    检查错误频率是否达到严重级别 (Docker环境故障检测)

    Returns:
        bool: True 表示错误频率过高,False 表示正常
    """
    try:
        current_time = time.time()
        cutoff_time = current_time - self.error_frequency_window

        # 统计时间窗口内的错误数量
        recent_errors = [ts for ts in self.error_timestamps if ts >= cutoff_time]
        error_count = len(recent_errors)

        if error_count >= self.critical_error_threshold:
            logger.warning(f"错误频率过高: {error_count}个错误在{self.error_frequency_window}秒内")
            return True

        return False

    except Exception as e:
        logger.error(f"检查错误频率时出现异常: {e}")
        return False

def _handle_recoverable_error(self, error_message: str, debug_info: str):
    """
    处理可恢复的错误

    Args:
        error_message: 错误消息
        debug_info: 调试信息
    """
    try:
        # 尝试提取出错的摄像头ID
        failed_camera_id = self._extract_camera_id_from_error(debug_info)

        if failed_camera_id:
            logger.warning(f"摄像头 {failed_camera_id} 出现错误,将在稍后重试连接")
            # 这里可以添加重连逻辑或错误计数
            # 暂时只记录错误,不做处理
        else:
            logger.warning("无法确定出错的摄像头,错误已记录")

    except Exception as e:
        logger.error(f"处理可恢复错误时出现异常: {e}")

def _extract_camera_id_from_error(self, debug_info: str) -> Optional[str]:
    """
    从错误调试信息中提取摄像头ID

    Args:
        debug_info: 调试信息

    Returns:
        Optional[str]: 摄像头ID,如果无法提取则返回None
    """
    try:
        import re

        # 尝试匹配 source-bin-{camera_id} 模式
        pattern = r'source-bin-([^/\s]+)'
        match = re.search(pattern, debug_info)

        if match:
            camera_id = match.group(1)
            logger.debug(f"从错误信息中提取到摄像头ID: {camera_id}")
            return camera_id

        return None

    except Exception as e:
        logger.error(f"提取摄像头ID时出现异常: {e}")
        return None

def start(self):
    """启动pipeline"""
    if self.running:
        logger.warning("Pipeline已在运行")
        return

    logger.info("启动多输出pipeline")

    def run_pipeline():
        try:
            self.create_pipeline()

            # 设置bus
            self.bus = self.pipeline.get_bus()
            self.bus.add_signal_watch()
            self.loop = GLib.MainLoop()
            self.bus.connect("message", self._bus_call, self.loop)

            # 启动pipeline
            ret = self.pipeline.set_state(Gst.State.PAUSED)
            if ret == Gst.StateChangeReturn.FAILURE:
                raise RuntimeError("无法设置pipeline为PAUSED状态")

            ret = self.pipeline.set_state(Gst.State.PLAYING)
            if ret == Gst.StateChangeReturn.FAILURE:
                raise RuntimeError("无法设置pipeline为PLAYING状态")

            self.running = True
            self._stats_last_report_time = time.time()
            self._stats_frames_since_report = 0
            logger.info("多输出Pipeline启动成功")

            # 运行主循环
            self.loop.run()

        except Exception as e:
            logger.error(f"Pipeline运行异常: {e}")
            logger.error(traceback.format_exc())
            sys.exit(1)  # 直接退出,由watchdog重启
        finally:
            self.running = False
            logger.info("Pipeline线程结束")
            sys.exit(1)  # 直接退出,由watchdog重启

    self.thread = threading.Thread(target=run_pipeline, daemon=True)
    self.thread.start()

    # 等待pipeline启动
    time.sleep(3)

def stop(self):
    """停止pipeline"""
    if not self.running:
        return

    logger.info("停止pipeline")
    self.running = False  # 先设置运行标志为False,防止新的回调提交

    try:
        # 停止pipeline
        if self.pipeline:
            logger.debug("停止pipeline")
            self.pipeline.set_state(Gst.State.NULL)

        # 退出主循环
        if self.loop:
            logger.debug("退出主循环")
            self.loop.quit()

        # 注意:不关闭线程池,让它保持运行状态
        # 线程池只在进程完全退出时才自动关闭

    except Exception as e:
        logger.error(f"停止pipeline异常: {e}")
        logger.error(f"停止异常详情: {traceback.format_exc()}")

    logger.info("Pipeline停止完成")

class DeepStreamPipelineManager:
“”"
DeepStream Pipeline 管理器 (重构版)
基于单Pipeline多输出架构
“”"

def __init__(self, frame_diff_threshold=30, frame_diff_iou=0.3, debug_image=False):
    """
    初始化 DeepStream Pipeline 管理器

    Args:
        frame_diff_threshold: 帧差法阈值 (默认30)
        frame_diff_iou: 帧差法交集阈值 (默认0.3)
        debug_image: 是否启用图片保存调试 (默认False)
    """
    self.current_pipeline = None
    self.active_cameras = {}  # {camera_id: camera_vo}
    self.inference_callback = None
    self.vis_callback = None
    self.lock = threading.Lock()

    # 保存帧差法参数
    self.frame_diff_threshold = frame_diff_threshold
    self.frame_diff_iou = frame_diff_iou

**
**针对deepstream-nvdsanalytics ** 蛀