以下是提问模版,可以修改仅留下适用于您的问题。
• GPU pro 2000 backwell
**• 7.1 或 8.0 **
• TensorRT 版本
• 580.95.05
• Deepstream8.0 或者7.1 的版本添加自定义插件帧差法插件 以及 添加 ROI 区域过滤配置 两者的效果都没有成功
def reid_buffer_probe(self, pad, info, u_data):
“”“ReID特征提取Probe - 从对象元数据中提取特征和检测框”“”
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
# 🎯 创建按摄像头分组的检测结果列表
# 结构: [{'camera_id': 1, 'datasets': [detection1, detection2, ...]}, ...]
detectionList = []
camera_datasets_map = {} # 临时字典,用于按camera_id分组
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
frame_num = frame_meta.frame_num
pad_index = frame_meta.pad_index
self.frame_count = frame_num
# 🎥 根据pad_index找到对应的camera_id
camera_id = None
for cid, idx in self.stream_index_map.items():
if idx == pad_index:
camera_id = cid
break
if camera_id is None:
camera_id = pad_index # 使用pad_index作为备用
# 只在特定帧提取特征的数据
if (frame_num % self.frame_interval) == 0:
detect_time = time.time()
# 初始化该摄像头的datasets列表
if camera_id not in camera_datasets_map:
camera_datasets_map[camera_id] = []
# 🔑 遍历对象,从每个对象的user_meta中获取ReID特征
l_obj = frame_meta.obj_meta_list
# 检测的数组
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
# 遍历对象的user metadata
l_user = obj_meta.obj_user_meta_list
while l_user is not None:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
# 检查是否是tensor输出元数据
if user_meta.base_meta.meta_type == pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META:
tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
# 检查是ReID的输出(unique-id=2)
if tensor_meta.unique_id == 2 and tensor_meta.num_output_layers >= 1:
try:
# 获取第一个输出层
if hasattr(pyds, 'get_nvds_LayerInfo'):
layer_info = pyds.get_nvds_LayerInfo(tensor_meta, 0)
else:
layer_info = tensor_meta.out_layer_info[0]
# 提取特征向量
features = self.extract_feature_vector(layer_info)
# 构建检测结果字典
detection = {
'bbox': [
int(obj_meta.rect_params.left),
int(obj_meta.rect_params.top),
int(obj_meta.rect_params.left + obj_meta.rect_params.width),
int(obj_meta.rect_params.top + obj_meta.rect_params.height)
],
'class_id': int(obj_meta.class_id),
'confidence': float(obj_meta.confidence),
'tracker_confidence': 0.5,
'resize_shape': (g_frame_height, g_frame_width),
'class_name': obj_meta.obj_label if obj_meta.obj_label else f"class_{obj_meta.class_id}",
'is_tracked': 1,
'feature': features,
'vehicle_color': 0,
'vehicle_type': 0,
'or_sport': 0,
'camera_id':camera_id
}
# 🎯 将检测结果添加到对应摄像头的datasets中
camera_datasets_map[camera_id].append(detection)
self.feature_count += 1
# 打印日志
# logger.info(f"[Camera {camera_id}][Frame {frame_num}] 🎯 对象#{obj_meta.object_id} 类别={obj_meta.class_id} 置信度={obj_meta.confidence:.2f}")
except Exception as e:
logger.warning(f"[Camera {camera_id}] 特征提取失败: {e}")
except StopIteration:
break
try:
l_user = l_user.next
except StopIteration:
break
except StopIteration:
break
try:
l_obj = l_obj.next
except StopIteration:
break
# 演示模式:同步可视化(绘制到OSD)
camera_vo = self.active_cameras.get(camera_id)
if camera_vo and getattr(camera_vo, 'demo_enabled', 0):
# 每帧都进行可视化,使用缓存的推理结果避免闪烁
self._smooth_car_visualization(frame_meta, batch_meta, camera_id)
try:
l_frame = l_frame.next
except StopIteration:
break
except StopIteration:
break
# 🎯 构建最终的 detectionList 带上当前时间
current_time = time.time()
for cam_id, datasets in camera_datasets_map.items():
if datasets: # 只添加有检测结果的摄像头
detectionList.append({
'camera_id': cam_id,
'datasets': datasets,
'current_time':current_time
})
# 📊 打印汇总信息
if detectionList:
total_detections = sum(len(item['datasets']) for item in detectionList)
logger.info(f"📊 本批次检测汇总: {len(detectionList)}个摄像头, 共{total_detections}个对象")
for item in detectionList:
logger.info(f" Camera {item['camera_id']}: {len(item['datasets'])}个对象")
# 🎯 保存检测结果到 TXT 文件(DEBUG_VIDEO 开启时)
for item in detectionList:
cam_id = item['camera_id']
datasets = item['datasets']
# 获取原始分辨率(从摄像头配置中获取)
camera_vo = self.active_cameras.get(cam_id)
if camera_vo:
orig_width = getattr(camera_vo, 'width', 1920)
orig_height = getattr(camera_vo, 'height', 1080)
else:
orig_width, orig_height = 1920, 1080
self._save_detection_to_txt(cam_id, datasets, current_time, orig_width, orig_height)
# 🔄 调用推理回调(如果有)
if self.inference_callback and detectionList:
self._async_batch_inference_callback(detectionList)
except Exception as e:
logger.error(f"⚠️ ReID Probe异常: {e}")
import traceback
traceback.print_exc()
return Gst.PadProbeReturn.OK
def _async_batch_inference_callback(self, detectionList):
"""
批量异步推理回调 - 处理多摄像头检测结果
Args:
detectionList: List[Dict] 格式,每个元素包含:
{
'camera_id': int,
'datasets': List[Dict], # 检测结果数组
'current_time': float
}
"""
logger.info(f"[DeepStreamPipelineManager] 📤 批量推理回调: {len(detectionList)}个摄像头")
try:
if self.inference_callback:
# ✅ 正确遍历 List 格式的 detectionList
for item in detectionList:
camera_id = item['camera_id']
datasets = item['datasets']
logger.info(f"[DeepStreamPipelineManager] Camera {camera_id}: {len(datasets)}个对象")
# 提交到线程池异步处理,并在完成后设置图片保存标志
logger.debug(f"[DeepStreamPipelineManager] 📤 提交到线程池处理")
future = self.thread_pool.submit(self.inference_callback, detectionList)
# 添加回调:在 inference_callback 完成后,设置图片保存标志
future.add_done_callback(lambda f: self._on_inference_complete(detectionList))
logger.debug(f"[DeepStreamPipelineManager] ✅ 已提交到线程池")
else:
logger.warning(f"[DeepStreamPipelineManager] ⚠️ 未设置推理回调函数")
except Exception as e:
logger.error(f"[DeepStreamPipelineManager] ❌ 批量推理回调异常: {e}")
import traceback
logger.error(traceback.format_exc())
def _on_inference_complete(self, detectionList):
"""
推理完成回调 - 设置图片保存标志
此时 VehicleProcessor.process 已完成,内存缓存已更新为最新数据
"""
try:
current_time = time.time()
with self.image_save_lock:
for item in detectionList:
camera_id = item['camera_id']
# 设置图片保存标志,标记该摄像头需要保存图片
self.image_save_flags[camera_id] = current_time
logger.debug(f"[DeepStreamPipelineManager] ✅ 设置图片保存标志: camera_id={camera_id}")
except Exception as e:
logger.error(f"[DeepStreamPipelineManager] ❌ 设置图片保存标志异常: {e}")
def extract_feature_vector(self, layer_info):
"""从NvDsInferLayerInfo提取特征向量(参考deepstream_reid_rtsp.py)"""
import ctypes
from ctypes import pythonapi, py_object
# 计算特征维度
feature_dim = 1
for i in range(layer_info.dims.numDims):
feature_dim *= layer_info.dims.d[i]
feature_vector = []
try:
ptr = layer_info.buffer
if not ptr:
return [0.0] * feature_dim
# 🔑 使用ctypes.pythonapi从PyCapsule中提取指针
pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
pythonapi.PyCapsule_GetPointer.argtypes = [py_object, ctypes.c_char_p]
ptr_value = pythonapi.PyCapsule_GetPointer(ptr, None)
if not ptr_value:
return [0.0] * feature_dim
# 将void指针转换为float指针
float_ptr = ctypes.cast(ptr_value, ctypes.POINTER(ctypes.c_float))
# 逐个读取float值
for i in range(feature_dim):
try:
feature_vector.append(float(float_ptr[i]))
except:
feature_vector.append(0.0)
except Exception as e:
print(f" ⚠️ 特征提取失败({e})")
feature_vector = [0.0] * feature_dim
# 🎯 L2 归一化(与 ONNX 保持一致)
feature_array = np.array(feature_vector, dtype=np.float32)
norm = np.linalg.norm(feature_array)
if norm > 1e-10:
feature_array = feature_array / norm
return feature_array.tolist()
def get_box_info(self, obj_meta):
"""从对象元数据中获取检测框信息"""
try:
# 获取边界框坐标
left = obj_meta.rect_params.left
top = obj_meta.rect_params.top
width = obj_meta.rect_params.width
height = obj_meta.rect_params.height
# 计算框的坐标 (x1, y1, x2, y2)
x1 = left
y1 = top
x2 = left + width
y2 = top + height
# 获取类别ID
class_id = obj_meta.class_id
return (x1, y1, x2, y2, class_id)
except:
return (0, 0, 0, 0, -1)
# 演示模式
def _generate_camera_configs(self):
"""为每个摄像头生成配置"""
for idx, (camera_id, camera_vo) in enumerate(self.active_cameras.items()):
self.stream_index_map[camera_id] = idx
# 生成RTMP地址(演示模式)
if getattr(camera_vo, 'is_demo', False):
from core.demo.aliyun_video import generate_push_url, generate_pull_url
push_url = generate_push_url(camera_id)
pull_url = generate_pull_url(camera_id, protocol="RTMP")
else:
push_url = None
pull_url = None
self.rtmp_configs[camera_id] = {
'push_url': push_url,
'pull_url': pull_url,
'sink_index': idx,
'is_demo': getattr(camera_vo, 'is_demo', False)
}
if push_url:
logger.info(f"[{camera_id}] 推流地址: {push_url}")
logger.info(f"[{camera_id}] 播放地址: {pull_url}")
else:
logger.info(f"[{camera_id}] 非演示模式,无推流地址")
# 车辆跟踪演示模式
def _generate_camera_car_track_configs(self):
"""为每个摄像头生成配置"""
for idx, (camera_id, camera_vo) in enumerate(self.active_cameras.items()):
self.stream_index_map[camera_id] = idx
# 生成RTMP地址(演示模式)
if getattr(camera_vo, 'demo_enabled', 0):
from core.demo.aliyun_video import generate_push_url, generate_pull_url
push_url = generate_push_url(camera_id)
pull_url = generate_pull_url(camera_id, protocol="RTMP")
else:
push_url = None
pull_url = None
self.rtmp_configs[camera_id] = {
'push_url': push_url,
'pull_url': pull_url,
'sink_index': idx,
'demo_enabled': getattr(camera_vo, 'demo_enabled', 0)
}
if push_url:
logger.info(f"[{camera_id}] 推流地址: {push_url}")
logger.info(f"[{camera_id}] 播放地址: {pull_url}")
self._publish_demo_url(camera_id, push_url, pull_url)
else:
logger.info(f"[{camera_id}] 非演示模式,无推流地址")
def _publish_demo_url(self, camera_id: str, push_url: str, pull_url: str) -> None:
"""推送车辆追踪演示推流地址"""
try:
from core.comm.mqtt_client import MQTTClient
logger.info(f"############# [demo_url_push] gggg")
payload = {
"camera_id": camera_id,
"push_url": push_url,
"pull_url": pull_url,
"event_type": "demo_url_push",
"source_id": camera_id,
"attributes": {},
"event_id": str(uuid.uuid4()),
"event_timestamp": int(time.time() * 1000),
}
MQTTClient.get_instance().publish_message("event/demo_url_push", payload, qos=1)
logger.info(f"############# [demo_url_push] [{camera_id}] done {pull_url}")
except Exception as e:
logger.error(f"[{camera_id}] 推流地址上报失败: {e}")
def _update_pipeline_stats(self):
"""定期输出Pipeline运行统计信息"""
if not self.running:
return
self._stats_frames_since_report += 1
now = time.time()
elapsed = now - self._stats_last_report_time
if elapsed >= self._stats_interval:
fps = (self._stats_frames_since_report / elapsed) if elapsed > 0 else 0.0
interval_values = list(self.camera_inference_intervals.values())
if interval_values:
interval_summary = f"{min(interval_values):.2f}~{max(interval_values):.2f}s"
else:
interval_summary = "N/A"
logger.info(
f"[GPU {self.gpu_id}] Pipeline运行统计 | 最近{elapsed:.1f}s推理帧: {self._stats_frames_since_report} "
f"| 平均FPS: {fps:.2f} | 摄像头数: {len(self.active_cameras)} "
f"| 推理间隔: {interval_summary} | nvinfer.interval: {g_inference_interval} "
f"| 编码器: {self.encoder_name or 'N/A'} | 状态: {'running' if self.running else 'stopped'}"
)
self._stats_last_report_time = now
self._stats_frames_since_report = 0
def create_pipeline(self):
"""创建单GPU的Pipeline,所有组件使用同一个GPU"""
logger.info(f"[GPU {self.gpu_id}] 创建Pipeline,流数: {len(self.active_cameras)}")
self.pipeline = Gst.Pipeline()
# ====== 输入部分:多路RTSP源 ======
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
if not streammux:
raise RuntimeError("无法创建 nvstreammux 元素")
# 对性能影响很重要的参数,设置为 8
streammux.set_property('batch-size', len(self.active_cameras))
# streammux.set_property('width', 1920)
# streammux.set_property('height', 1080)
# 归一化处理,适配大部分场景
streammux.set_property('width', g_frame_width)
streammux.set_property('height', g_frame_height)
streammux.set_property('enable-padding', 1) # 保持长宽比
# 25 fps
streammux.set_property('batched-push-timeout', 40000)
streammux.set_property('live-source', 1)
# 🎯 使用该Pipeline指定的GPU
streammux.set_property('gpu-id', self.gpu_id)
streammux.set_property('nvbuf-memory-type', 0)
# 🔧 FIX: 防止单个源EOS导致整个pipeline结束
try:
streammux.set_property('drop-pipeline-eos', True)
logger.info(f"[GPU {self.gpu_id}] nvstreammux 已启用 drop-pipeline-eos,忽略单源EOS")
except Exception as e:
logger.warning(f"[GPU {self.gpu_id}] 无法设置drop-pipeline-eos: {e}")
# 🔧 FIX: 对于实时RTSP流,不同步输入以提高稳定性
try:
streammux.set_property('sync-inputs', 0)
logger.info(f"[GPU {self.gpu_id}] nvstreammux 已禁用sync-inputs,提高RTSP稳定性")
except Exception as e:
logger.warning(f"[GPU {self.gpu_id}] 无法设置sync-inputs: {e}")
self.pipeline.add(streammux)
logger.info(f"[GPU {self.gpu_id}] nvstreammux 使用 GPU {self.gpu_id} (已应用EOS防护)")
# 添加source bins
for camera_id, camera_vo in self.active_cameras.items():
sink_idx = self.stream_index_map[camera_id]
src_bin = self._create_source_bin(camera_id, camera_vo.rtsp_url, self.gpu_id)
self.pipeline.add(src_bin)
sinkpad = streammux.request_pad_simple(f"sink_{sink_idx}")
srcpad = src_bin.get_static_pad("src")
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
raise RuntimeError(f"无法连接 {camera_id} 到 streammux")
logger.info(f"[Pipeline] 链接流 {camera_id} 到 sink_{sink_idx}")
# # ====== 推理部分:批处理推理 ======
# queue1 = Gst.ElementFactory.make("queue", "queue1")
# pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
# if not pgie:
# raise RuntimeError("无法创建 nvinfer 元素")
#
# # 🎯 使用该GPU对应的配置文件
# yolov8_config = get_yolov8_config_path(self.gpu_id)
# pgie.set_property('config-file-path', yolov8_config)
# ====== 创建帧差法元素(在YOLO前进行运动检测) ======
logger.info("=" * 80)
logger.info("🎬 开始创建帧差法运动检测元素...")
logger.info("=" * 80)
# 🎯 加载本地的帧差法插件(不需要安装到系统)
framediff_plugin_path = "/sgx/prod/eStellar/DeepStream-Yolo/frame_diff"
framediff_so_path = os.path.join(framediff_plugin_path, "libgstframediff.so")
logger.info(f"📂 检查帧差法插件路径: {framediff_plugin_path}")
if not os.path.exists(framediff_plugin_path):
logger.error(f"❌ 帧差法插件目录不存在: {framediff_plugin_path}")
self.framediff = None
elif not os.path.exists(framediff_so_path):
logger.error(f"❌ 帧差法插件文件不存在: {framediff_so_path}")
logger.error(f" 请先编译插件: cd {framediff_plugin_path} && make")
self.framediff = None
else:
logger.info(f"✅ 找到帧差法插件: {framediff_so_path}")
logger.info(f" 文件大小: {os.path.getsize(framediff_so_path) / 1024:.1f} KB")
# 加载插件
logger.info(f"🔧 正在加载帧差法插件...")
registry = Gst.Registry.get()
scan_result = registry.scan_path(framediff_plugin_path)
logger.info(f" 扫描结果: {scan_result}")
# 尝试创建元素
logger.info(f"🔧 尝试创建 framediff 元素...")
self.framediff = Gst.ElementFactory.make("framediff", "framediff")
if not self.framediff:
logger.error("❌ 无法创建 framediff 元素!")
logger.error(" 可能原因:")
logger.error(" 1. 插件未正确编译")
logger.error(" 2. 缺少依赖库(CUDA、DeepStream)")
logger.error(" 3. GStreamer 无法识别插件")
logger.error(f" 编译命令: cd {framediff_plugin_path} && make")
logger.error(" 调试命令: gst-inspect-1.0 framediff")
self.framediff = None
else:
logger.info("✅ framediff 元素创建成功!")
# 🎯 配置帧差法参数(与 YOLO 同步:每 25 帧执行一次)
logger.info("🔧 配置帧差法参数...")
self.framediff.set_property("diff-threshold", 30) # 像素差异阈值 (0-255)
self.framediff.set_property("motion-threshold", 0.01) # 运动比例阈值 (0.0-1.0)
self.framediff.set_property("enable-debug", True) # 🎯 启用调试输出
self.framediff.set_property("gpu-id", self.gpu_id) # GPU ID
self.framediff.set_property("interval", 25) # 🎯 每 25 帧计算一次(与 YOLO 同步)
self.pipeline.add(self.framediff)
logger.info("=" * 80)
logger.info("🎉 帧差法配置完成!")
logger.info(f" GPU ID: {self.gpu_id}")
logger.info(f" 推理间隔: 每 25 帧执行一次 (与 YOLO 同步)")
logger.info(f" 像素差异阈值: 30")
logger.info(f" 运动比例阈值: 0.01 (1%)")
logger.info(f" 调试模式: 已启用")
logger.info("=" * 80)
# 创建YOLO主检测器
print("✓ 创建YOLO主检测器...")
self.pgie = Gst.ElementFactory.make("nvinfer", "pgie")
if not self.pgie:
print("✗ 无法创建YOLO检测器")
return False
self.pgie.set_property("config-file-path", "/sgx/prod/eStellar/agent/config/config_infer_primary_yoloV8.txt")
self.pgie.set_property("batch-size", 6)
self.pgie.set_property("unique-id", 1)
self.pgie.set_property("process-mode", 1)
self.pipeline.add(self.pgie)
print(f" 配置: batch-size={6}, process-mode=1 (Full frame)")
# 创建ReID副检测器
print("✓ 创建ReID副检测器...")
self.sgie = Gst.ElementFactory.make("nvinfer", "sgie")
if not self.sgie:
print("✗ 无法创建ReID检测器")
return False
# 🎯 使用该Pipeline指定的GPU进行推理
self.sgie.set_property("config-file-path", "/sgx/prod/eStellar/agent/config/config_infer_reid.txt")
self.sgie.set_property("batch-size", 6)
self.sgie.set_property("unique-id", 2)
self.sgie.set_property("process-mode", 2)
self.pipeline.add(self.sgie)
print(f" 配置: batch-size=6, process-mode=2 (Object level)")
# 创建其他元素
print("✓ 创建其他处理元素...")
nvconv = Gst.ElementFactory.make("nvvideoconvert", "nvconv")
nvosd = Gst.ElementFactory.make("nvdsosd", "nvosd")
sink = Gst.ElementFactory.make("fakesink", "sink")
for elem in [nvconv, nvosd, sink]:
if not elem:
print("✗ 无法创建元素")
return False
self.pipeline.add(elem)
sink.set_property("sync", False)
# ========== 创建 tee 分支(检测+ReID 和 图片保存) ==========
if self.debug_image:
print("✓ 创建 tee 分支(启用图片保存调试)...")
tee = Gst.ElementFactory.make("tee", "tee")
if not tee:
print("✗ 无法创建 tee 元素")
return False
self.pipeline.add(tee)
# 分支1:检测 + ReID 分支
queue_infer = Gst.ElementFactory.make("queue", "queue_infer")
if not queue_infer:
print("✗ 无法创建 queue_infer")
return False
self.pipeline.add(queue_infer)
# 分支2:图片保存分支
queue_image = Gst.ElementFactory.make("queue", "queue_image")
nvconv_image = Gst.ElementFactory.make("nvvideoconvert", "nvconv_image")
capsfilter_image = Gst.ElementFactory.make("capsfilter", "capsfilter_image")
sink_image = Gst.ElementFactory.make("fakesink", "sink_image")
for elem in [queue_image, nvconv_image, capsfilter_image, sink_image]:
if not elem:
print("✗ 无法创建图片保存分支元素")
return False
self.pipeline.add(elem)
# 设置图片分支的 caps(转换为 BGR 格式供 OpenCV 使用)
caps_image = Gst.Caps.from_string("video/x-raw, format=BGRx")
capsfilter_image.set_property("caps", caps_image)
sink_image.set_property("sync", False)
else:
print("✓ 跳过 tee 分支(图片保存调试已禁用)...")
tee = None
queue_infer = None
# 链接所有元素
print("🔗 链接Pipeline...")
if self.debug_image:
# ========== 启用图片保存调试:使用 tee 分支 ==========
# 根据是否有 framediff 插件决定链接方式
if self.framediff:
# 有帧差法 (CUDA 版本): streammux -> framediff -> tee
link_success = (streammux.link(self.framediff) and
self.framediff.link(tee))
print(" 链路: streammux -> framediff (CUDA) -> tee")
else:
# 无帧差法: streammux -> tee
link_success = streammux.link(tee)
print(" 链路: streammux -> tee")
if not link_success:
print("✗ Pipeline链接到tee失败")
return False
# 链接 tee 的分支1:检测 + ReID
# tee -> queue_infer -> pgie -> sgie -> nvconv -> nvosd -> sink
tee_src_pad_infer = tee.request_pad_simple("src_%u")
queue_infer_sink_pad = queue_infer.get_static_pad("sink")
if tee_src_pad_infer.link(queue_infer_sink_pad) != Gst.PadLinkReturn.OK:
print("✗ 无法链接 tee -> queue_infer")
return False
if not (queue_infer.link(self.pgie) and
self.pgie.link(self.sgie) and
self.sgie.link(nvconv) and
nvconv.link(nvosd) and
nvosd.link(sink)):
print("✗ 检测+ReID分支链接失败")
return False
print(" 分支1: tee -> queue_infer -> pgie -> sgie -> nvconv -> nvosd -> sink")
# 链接 tee 的分支2:图片保存
# tee -> queue_image -> nvconv_image -> capsfilter_image -> sink_image
tee_src_pad_image = tee.request_pad_simple("src_%u")
queue_image_sink_pad = queue_image.get_static_pad("sink")
if tee_src_pad_image.link(queue_image_sink_pad) != Gst.PadLinkReturn.OK:
print("✗ 无法链接 tee -> queue_image")
return False
if not (queue_image.link(nvconv_image) and
nvconv_image.link(capsfilter_image) and
capsfilter_image.link(sink_image)):
print("✗ 图片保存分支链接失败")
return False
print(" 分支2: tee -> queue_image -> nvconv_image -> capsfilter_image -> sink_image")
# 添加图片保存分支的 Probe
sink_image.get_static_pad("sink").add_probe(
Gst.PadProbeType.BUFFER, self._image_save_probe, 0)
else:
# ========== 禁用图片保存调试:直接链接,无 tee 分支 ==========
# 根据是否有 framediff 插件决定链接方式
if self.framediff:
# 有帧差法 (CUDA 版本,直接处理 NVMM buffer): streammux -> framediff -> pgie
link_success = (streammux.link(self.framediff) and
self.framediff.link(self.pgie))
print(" 链路: streammux -> framediff (CUDA) -> pgie -> sgie -> nvconv -> nvosd -> sink")
else:
# 无帧差法: streammux -> pgie
link_success = streammux.link(self.pgie)
print(" 链路: streammux -> pgie -> sgie -> nvconv -> nvosd -> sink")
if not link_success:
print("✗ Pipeline链接失败")
return False
# 直接链接检测和ReID
if not (self.pgie.link(self.sgie) and
self.sgie.link(nvconv) and
nvconv.link(nvosd) and
nvosd.link(sink)):
print("✗ 检测+ReID链接失败")
return False
# 添加Probe
print("📍 添加Probe...\n")
self.pgie.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER,
self.yolo_buffer_probe, 0)
self.sgie.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER,
self.reid_buffer_probe, 0)
# 禁用 deepstream 默认的可视化
# pgie.set_property('process-mode', 1) # 1=PROCESS_MODE_CLIP_OBJECTS
# # ====== 追踪部分:对象追踪(可选) ======
# nvtracker = None
# g_enable_tracking = False
# if g_enable_tracking:
# nvtracker = Gst.ElementFactory.make("nvtracker", "tracker")
# if not nvtracker:
# raise RuntimeError("无法创建 nvtracker 元素")
#
# # 配置追踪器 - 使用更保守的设置以避免内存崩溃
# nvtracker.set_property('tracker-width', 320) # 减少分辨率以降低内存使用
# nvtracker.set_property('tracker-height', 192) # 减少分辨率以降低内存使用
# # 🎯 使用该Pipeline指定的GPU进行跟踪
# nvtracker.set_property('gpu-id', self.gpu_id)
# nvtracker.set_property('ll-lib-file',
# '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
# logger.info(f"[GPU {self.gpu_id}] nvtracker 使用 GPU {self.gpu_id}")
#
# # 使用平衡配置文件(优先保障目标检测)
# balanced_config_path = '/sgx/prod/estellar/agent/config/tracker_config_balanced.yml'
# safe_config_path = '/sgx/prod/estellar/agent/config/tracker_config_safe.yml'
#
# if os.path.exists(balanced_config_path):
# nvtracker.set_property('ll-config-file', balanced_config_path)
# logger.info("使用平衡模式追踪器配置(优先保障目标检测)")
# elif os.path.exists(safe_config_path):
# nvtracker.set_property('ll-config-file', safe_config_path)
# logger.info("使用安全模式追踪器配置")
# else:
# nvtracker.set_property('ll-config-file', '/sgx/prod/estellar/agent/config/tracker_config.yml')
# logger.warning("平衡/安全配置文件不存在,使用默认配置")
# else:
# logger.info("目标追踪已禁用,跳过追踪器配置")
#
# # ====== 分流部分:nvstreamdemux ======
# nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "stream-demuxer")
# if not nvstreamdemux:
# raise RuntimeError("无法创建 nvstreamdemux 元素")
#
# # 添加核心元素
# elements = [queue1, pgie, nvstreamdemux]
# if nvtracker:
# elements.insert(2, nvtracker) # 在pgie和nvstreamdemux之间插入tracker
# for elem in elements:
# self.pipeline.add(elem)
#
# # 链接核心pipeline
# if not streammux.link(queue1):
# raise RuntimeError("无法连接 streammux -> queue1")
# if not queue1.link(pgie):
# raise RuntimeError("无法连接 queue1 -> pgie")
#
# # 根据是否启用追踪,选择不同的链接方式
# if nvtracker:
# if not pgie.link(nvtracker):
# raise RuntimeError("无法连接 pgie -> nvtracker")
# if not nvtracker.link(nvstreamdemux):
# raise RuntimeError("无法连接 nvtracker -> nvstreamdemux")
# logger.info("Pipeline链路: streammux -> queue1 -> pgie -> nvtracker -> nvstreamdemux")
# else:
# if not pgie.link(nvstreamdemux):
# raise RuntimeError("无法连接 pgie -> nvstreamdemux")
# logger.info("Pipeline链路: streammux -> queue1 -> pgie -> nvstreamdemux (无追踪)")
#
# # ====== 输出部分:为每个流创建独立的编码推流分支 ======
# for camera_id in self.active_cameras.keys():
# sink_idx = self.stream_index_map[camera_id]
# config = self.rtmp_configs[camera_id]
# self._create_output_branch(nvstreamdemux, camera_id, sink_idx, config)
#
# # 添加推理probe(在最后一个推理元素后获取结果)
# if nvtracker:
# # 有追踪器时,在tracker之后获取追踪结果
# probe_pad = nvtracker.get_static_pad("src")
# logger.info("推理probe已绑定到nvtracker输出")
# else:
# # 无追踪器时,在pgie之后获取检测结果
# probe_pad = pgie.get_static_pad("src")
# logger.info("推理probe已绑定到pgie输出(无追踪模式)")
#
# probe_pad.add_probe(Gst.PadProbeType.BUFFER, self._pgie_src_pad_buffer_probe, 0)
#
# 运行Pipeline
print("=" * 80)
print("🎬 运行中... (Ctrl+C停止)")
print("=" * 80 + "\n")
loop = GLib.MainLoop()
bus = self.pipeline.get_bus()
bus.add_signal_watch()
def on_message(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("\n✓ 到达EOS,停止Pipeline")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"\n✗ Pipeline错误: {err}")
print(f" 详情: {debug}")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print(f"⚠️ 警告: {err}")
return True
bus.connect("message", on_message, loop)
self.pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except KeyboardInterrupt:
print("\n⏹️ 停止...\n")
finally:
self.pipeline.set_state(Gst.State.NULL)
# self._print_summary()
return True
def _create_source_bin(self, camera_id, rtsp_url, gpu_id=0):
"""
创建source bin
Args:
camera_id: 摄像头ID
rtsp_url: RTSP地址
gpu_id: 分配的GPU ID
"""
bin_name = f"source-bin-{camera_id}"
nbin = Gst.Bin.new(bin_name)
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", None)
if not uri_decode_bin:
raise RuntimeError(f"无法创建 uridecodebin for {camera_id}")
# 设置RTSP相关属性
uri_decode_bin.set_property("uri", rtsp_url)
# 🔧 FIX: 防止提前EOS - 禁止在segment后发送EOS信号
try:
uri_decode_bin.set_property("eos-after-segment", False)
logger.info(f"[{camera_id}] 已禁用eos-after-segment,防止提前EOS")
except Exception as e:
logger.warning(f"[{camera_id}] 无法设置eos-after-segment: {e}")
# 🔧 FIX: 增加缓冲区大小和时间,提高稳定性
try:
# uri_decode_bin.set_property("buffer-size", 524288) # 512KB缓冲区
uri_decode_bin.set_property("buffer-duration", 2000000000) # 2秒缓冲时间(纳秒)
logger.info(f"[{camera_id}] 已设置缓冲区: 512KB, 2秒")
except Exception as e:
logger.warning(f"[{camera_id}] 无法设置缓冲区参数: {e}")
# 🔧 FIX: 配置连接参数,避免超时导致EOS
try:
uri_decode_bin.set_property("connection-speed", 0) # 0=无限制
logger.info(f"[{camera_id}] 已设置无限制连接速度")
except Exception as e:
logger.warning(f"[{camera_id}] 无法设置connection-speed: {e}")
# 🎯 添加 nvvideoconvert 将 raw buffer 转换为 NVMM buffer
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", f"nvvidconv-src-{camera_id}")
if not nvvidconv:
raise RuntimeError(f"无法创建 nvvideoconvert for {camera_id}")
# 设置 GPU ID
nvvidconv.set_property("gpu-id", gpu_id)
# 添加元素到 bin
Gst.Bin.add(nbin, uri_decode_bin)
Gst.Bin.add(nbin, nvvidconv)
# 连接 uridecodebin -> nvvideoconvert
# 🔧 FIX: 使用闭包捕获 camera_id,并过滤只连接视频 pad
cam_id_copy = camera_id # 显式捕获变量
def on_pad_added(dbin, pad, nvconv):
"""当 uridecodebin 添加 pad 时,连接到 nvvideoconvert"""
# 🎯 只处理视频 pad,忽略音频 pad
caps = pad.get_current_caps()
if caps is None:
caps = pad.query_caps(None)
if caps is None:
return
struct = caps.get_structure(0)
if struct is None:
return
name = struct.get_name()
if not name.startswith("video/"):
# 忽略非视频 pad(如音频)
return
sinkpad = nvconv.get_static_pad("sink")
if sinkpad and not sinkpad.is_linked():
ret = pad.link(sinkpad)
if ret == Gst.PadLinkReturn.OK:
logger.info(f"[{cam_id_copy}] 已连接 uridecodebin -> nvvideoconvert (视频)")
else:
logger.warning(f"[{cam_id_copy}] 连接失败: {ret}")
uri_decode_bin.connect("pad-added", on_pad_added, nvvidconv)
# 创建 ghost pad 连接到 nvvideoconvert 的输出
nvconv_srcpad = nvvidconv.get_static_pad("src")
nbin.add_pad(Gst.GhostPad.new("src", nvconv_srcpad))
logger.info(f"[{camera_id}] Source bin created for {rtsp_url} on GPU {gpu_id} (with nvvideoconvert)")
return nbin
def _create_output_branch(self, nvstreamdemux, camera_id, sink_idx, config):
"""为每个流创建独立的输出分支"""
logger.info(f"[{camera_id}] 创建输出分支 sink_{sink_idx}")
# 创建输出分支元素
queue_out = Gst.ElementFactory.make("queue", f"queue-out-{camera_id}")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", f"nvvidconv-{camera_id}")
nvosd = Gst.ElementFactory.make("nvdsosd", f"nvosd-{camera_id}")
if not all([queue_out, nvvidconv, nvosd]):
raise RuntimeError(f"无法创建 {camera_id} 的基础输出元素")
# 添加基础元素到pipeline
for elem in [queue_out, nvvidconv, nvosd]:
self.pipeline.add(elem)
# 链接基础元素
if not queue_out.link(nvvidconv):
raise RuntimeError(f"无法连接 {camera_id} queue_out -> nvvidconv")
if not nvvidconv.link(nvosd):
raise RuntimeError(f"无法连接 {camera_id} nvvidconv -> nvosd")
# 根据是否演示模式创建不同的输出
if config['is_demo'] and config['push_url']:
# 演示模式:创建RTMP推流输出
self._create_rtmp_output(camera_id, nvosd, config['push_url'])
else:
# 非演示模式:使用fakesink
self._create_fake_output(camera_id, nvosd)
# 连接demux到分支
demux_src_pad = nvstreamdemux.request_pad_simple(f"src_{sink_idx}")
queue_sink_pad = queue_out.get_static_pad("sink")
if demux_src_pad.link(queue_sink_pad) != Gst.PadLinkReturn.OK:
raise RuntimeError(f"无法连接 demux -> {camera_id} 输出分支")
logger.info(f"[{camera_id}] 输出分支创建完成")
def _create_rtmp_output(self, camera_id, nvosd, push_url):
"""创建RTMP推流输出"""
# 🎯 使用该Pipeline的编码器
if self.encoder_name is None:
error_msg = (
f"❌ 摄像头 {camera_id} 无法创建推流输出\n"
f" 原因: GPU {self.gpu_id} 硬件编码器不可用\n"
f" ⚠️ 系统拒绝启动该摄像头的推流分支"
)
logger.error(error_msg)
raise RuntimeError(f"GPU {self.gpu_id} 硬件编码器不可用")
# 使用检测到的编码器
encoder = Gst.ElementFactory.make(self.encoder_name, f"encoder-{camera_id}")
h264parse = Gst.ElementFactory.make("h264parse", f"h264parse-{camera_id}")
capsfilter = Gst.ElementFactory.make("capsfilter", f"capsfilter-{camera_id}")
queue_rtmp = Gst.ElementFactory.make("queue", f"queue-rtmp-{camera_id}")
flvmux = Gst.ElementFactory.make("flvmux", f"flvmux-{camera_id}")
queue_sink = Gst.ElementFactory.make("queue", f"queue-sink-{camera_id}")
rtmpsink = Gst.ElementFactory.make("rtmpsink", f"rtmpsink-{camera_id}")
if not all([encoder, h264parse, capsfilter, queue_rtmp, flvmux, queue_sink, rtmpsink]):
raise RuntimeError(f"无法创建 {camera_id} 的RTMP输出元素")
# 配置编码器
encoder.set_property('bitrate', 4000000)
# 🎯 设置编码器使用该Pipeline的GPU
if self.encoder_name == "nvv4l2h264enc":
# nvv4l2h264enc 不需要设置gpu-id,自动使用当前上下文的GPU
pass
elif self.encoder_name == "nvh264enc":
# nvh264enc 需要设置gpu-id
encoder.set_property('gpu-id', self.gpu_id)
caps = Gst.Caps.from_string("video/x-h264,stream-format=avc,alignment=au")
capsfilter.set_property("caps", caps)
flvmux.set_property("streamable", True)
rtmpsink.set_property('location', push_url)
rtmpsink.set_property('sync', False)
logger.info(f"[{camera_id}] 使用编码器 {self.encoder_name} on GPU {self.gpu_id}")
# 添加到pipeline
elements = [encoder, h264parse, capsfilter, queue_rtmp, flvmux, queue_sink, rtmpsink]
for elem in elements:
self.pipeline.add(elem)
# 链接元素
if not nvosd.link(encoder):
raise RuntimeError(f"无法连接 {camera_id} nvosd -> encoder")
if not encoder.link(h264parse):
raise RuntimeError(f"无法连接 {camera_id} encoder -> h264parse")
if not h264parse.link(capsfilter):
raise RuntimeError(f"无法连接 {camera_id} h264parse -> capsfilter")
if not capsfilter.link(queue_rtmp):
raise RuntimeError(f"无法连接 {camera_id} capsfilter -> queue_rtmp")
if not queue_rtmp.link(flvmux):
raise RuntimeError(f"无法连接 {camera_id} queue_rtmp -> flvmux")
if not flvmux.link(queue_sink):
raise RuntimeError(f"无法连接 {camera_id} flvmux -> queue_sink")
if not queue_sink.link(rtmpsink):
raise RuntimeError(f"无法连接 {camera_id} queue_sink -> rtmpsink")
logger.info(f"[{camera_id}] RTMP推流输出创建完成")
def _create_fake_output(self, camera_id, nvosd):
"""创建fakesink输出"""
fakesink = Gst.ElementFactory.make("fakesink", f"fakesink-{camera_id}")
if not fakesink:
raise RuntimeError(f"无法创建 {camera_id} 的fakesink")
fakesink.set_property("sync", False)
self.pipeline.add(fakesink)
if not nvosd.link(fakesink):
raise RuntimeError(f"无法连接 {camera_id} nvosd -> fakesink")
logger.info(f"[{camera_id}] fakesink输出创建完成")
def _pgie_src_pad_buffer_probe(self, pad, info, u_data):
"""
推理结果处理probe - 批处理多流
1. 解析帧检测结果
2. 同步可视化(演示模式)
3. 异步推理回调 - 添加频次控制
4. 每1000帧打印一次状态
5. 处理异常
6. 返回OK
"""
try:
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
frame_number = frame_meta.frame_num
pad_index = frame_meta.pad_index
# 根据pad_index找到对应的camera_id
camera_id = None
for cid, idx in self.stream_index_map.items():
if idx == pad_index:
camera_id = cid
break
if not camera_id:
l_frame = l_frame.next
continue
# 解析检测结果
detections = self._parse_frame_detections(frame_meta)
detect_time = time.time()
# 演示模式:同步可视化(绘制到OSD)
camera_vo = self.active_cameras.get(camera_id)
if camera_vo and getattr(camera_vo, 'is_demo', False):
# 每帧都进行可视化,使用缓存的推理结果避免闪烁
self._smooth_visualization(frame_meta, batch_meta, detections, camera_id)
# 异步推理回调 - 使用帧号控制(更可靠)
if self.inference_callback and detections:
# 构造兼容的frame_meta字典
frame_meta_dict = {
'frame_num': frame_meta.frame_num,
'pad_index': frame_meta.pad_index,
'source_frame_width': frame_meta.source_frame_width,
'source_frame_height': frame_meta.source_frame_height,
'resize_shape': (g_frame_height, g_frame_width),
'timestamp': getattr(frame_meta, 'buf_pts', 0),
'detect_time': detect_time
}
self._async_inference_callback(camera_id, detections, frame_meta_dict)
# 每1000帧打印一次状态
if frame_number % 1000 == 0:
logger.info(f"[{camera_id}] Frame {frame_number}, 检测数: {len(detections)}")
# 更新pipeline运行统计
self._update_pipeline_stats()
l_frame = l_frame.next
except Exception as e:
logger.error(f"PadProbe处理异常: {e}")
logger.error(traceback.format_exc())
return Gst.PadProbeReturn.OK
def _parse_frame_detections(self, frame_meta):
"""解析帧检测结果"""
detections = []
l_obj = frame_meta.obj_meta_list
# 定义需要的类别ID(根据YOLOv8的COCO类别)
VEHICLE_CLASS_IDS = {2, 3, 5, 7} # car, motorcycle, bus, truck
PERSON_CLASS_ID = 0 # person
ALLOWED_CLASS_IDS = VEHICLE_CLASS_IDS | {PERSON_CLASS_ID}
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
# 过滤:只保留车辆和人员
if obj_meta.class_id not in ALLOWED_CLASS_IDS:
l_obj = l_obj.next
continue
# 处理追踪ID - 兼容有无追踪器的情况
if g_enable_tracking:
object_id = obj_meta.object_id
tracker_confidence = float(obj_meta.tracker_confidence) if obj_meta.tracker_confidence > 0 else None
is_tracked = obj_meta.object_id != 0xFFFFFFFFFFFFFFFF
else:
# 无追踪模式:生成临时ID,无追踪置信度
object_id = 0xFFFFFFFFFFFFFFFF # 无效追踪ID
tracker_confidence = None
is_tracked = False
detection = {
'bbox': [
int(obj_meta.rect_params.left),
int(obj_meta.rect_params.top),
int(obj_meta.rect_params.left + obj_meta.rect_params.width),
int(obj_meta.rect_params.top + obj_meta.rect_params.height)
],
'object_id': object_id,
'class_id': int(obj_meta.class_id),
'confidence': float(obj_meta.confidence),
'tracker_confidence': tracker_confidence,
'resize_shape': (g_frame_height, g_frame_width),
'class_name': obj_meta.obj_label if obj_meta.obj_label else f"class_{obj_meta.class_id}",
'is_tracked': is_tracked
}
x1, y1, x2, y2 = detection["bbox"]
# 计算车辆判定点,y 坐标使用整个边界框的底部 + 边界框高度的 1/4,注意这里的纵坐标方向是向下的
center_x = (x1 + x2) / 2
center_y = ((y1 + y2) / 2) + (y2 - y1) / 4
detection["judge_point"] = (int(center_x), int(center_y))
detections.append(detection)
except Exception as e:
logger.error(f"解析检测对象失败: {e}")
try:
l_obj = l_obj.next
except:
break
return detections
def _parse_frame_reid_yolo(self, frame_mata):
detection = {
'bbox': [
int(obj_meta.rect_params.left),
int(obj_meta.rect_params.top),
int(obj_meta.rect_params.left + obj_meta.rect_params.width),
int(obj_meta.rect_params.top + obj_meta.rect_params.height)
],
'object_id': object_id,
'class_id': int(obj_meta.class_id),
'confidence': float(obj_meta.confidence),
'tracker_confidence': tracker_confidence,
'resize_shape': (g_frame_height, g_frame_width),
'class_name': obj_meta.obj_label if obj_meta.obj_label else f"class_{obj_meta.class_id}",
'is_tracked': is_tracked
}
def _smooth_visualization(self, frame_meta, batch_meta, detections, camera_id):
"""平滑可视化(演示模式)- 每帧基于缓存数据绘制,避免闪烁"""
try:
# 获取摄像机配置
from core.rule.config_manager import ConfigManager
from core.cache.state_cache import get_state_cache
config_manager = ConfigManager.get_instance()
camera_config = config_manager.get_camera(camera_id)
if not camera_config:
return
state_cache = get_state_cache()
frame_width = g_frame_width
frame_height = g_frame_height
# 场景1:车位检测场景
if camera_config.park_check_tag == 1 and hasattr(camera_config, 'spots') and camera_config.spots:
self._draw_parking_spots(frame_meta, batch_meta, camera_id, camera_config, state_cache, frame_width,
frame_height)
# 场景2:排队监测场景
if camera_config.queue_check_tag == 1:
self._draw_queue_monitoring(frame_meta, batch_meta, camera_id, camera_config, state_cache, detections,
frame_width, frame_height)
# 场景3:人员聚集检测场景 - 使用平滑绘制
if camera_config.crowd_check_tag == 1:
self._draw_crowd_monitoring_smooth(frame_meta, batch_meta, camera_id, camera_config, state_cache,
detections, frame_width, frame_height)
except Exception as e:
logger.error(f"平滑可视化失败: {e}")
logger.error(traceback.format_exc())
# 每个摄像头的上次更新时间缓存 {camera_id: last_update_time}
_camera_last_update_time: Dict[int, float] = {}
# 每个摄像头的上次图片保存时间缓存 {camera_id: last_save_time}
_camera_last_save_time: Dict[int, float] = {}
def _smooth_car_visualization(self, frame_meta, batch_meta, camera_id):
"""
平滑可视化(演示模式)- 每帧基于缓存数据绘制车辆信息
逻辑:
1. 获取该摄像头上一次的更新时间
2. 找到离这个时间最近的所有车辆(基于 last_update_time)
3. 绘制车辆边界框、车牌号
"""
try:
# 获取摄像机配置
from core.rule.config_manager import ConfigManager
from core.cache.vehicle_track_cache import VehicleTrackCacheManager
config_manager = ConfigManager.get_instance()
camera_config = config_manager.get_camera(camera_id)
if not camera_config:
return
frame_width = g_frame_width
frame_height = g_frame_height
# ========== 从内存缓存获取该摄像头的所有车辆 ==========
# 🎯 使用 car_latest=1 的数据(包含 feature,用于 RTMP 绘制)
track_cache_mgr = VehicleTrackCacheManager.get_instance()
cached_vehicles = track_cache_mgr.search_by_camera(camera_id, car_latest=1)
if not cached_vehicles:
return
# ========== 获取该摄像头上一次的更新时间 ==========
prev_update_time = self._camera_last_update_time.get(camera_id, 0)
# ========== 找到所有车辆中最新的 last_update_time ==========
latest_time = 0
for v in cached_vehicles:
update_time = v.get('last_update_time') or 0
if update_time > latest_time:
latest_time = update_time
if latest_time == 0:
return
# ========== 更新该摄像头的上次更新时间 ==========
self._camera_last_update_time[camera_id] = latest_time
# ========== 获取离上次更新时间最近的所有车辆 ==========
# 获取 last_update_time >= prev_update_time 的车辆(允许 0.1 秒误差)
if prev_update_time == 0:
# 第一次:获取最新时间的所有车辆
valid_vehicles = [v for v in cached_vehicles
if abs((v.get('last_update_time') or 0) - latest_time) <= 0.1]
else:
# 获取 >= 上次时间的所有车辆(新更新的)
valid_vehicles = [v for v in cached_vehicles
if (v.get('last_update_time') or 0) >= prev_update_time - 0.1]
# ========== 绘制每辆车的信息 ==========
for vehicle in valid_vehicles:
bbox = vehicle.get('bbox') or vehicle.get('last_position')
if not bbox or len(bbox) < 4:
continue
# 获取车牌号和位置信息
plate_no = vehicle.get('vehicle_plate_no', '')
g_track_id = vehicle.get('g_track_id', 0)
# 绘制边界框(青色)
self._draw_rectangle_osd(frame_meta, batch_meta, bbox, 0.0, 1.0, 1.0)
# 绘制判定点(红色圆点)
cx = (bbox[0] + bbox[2]) // 2
cy = (bbox[1] + bbox[3]) // 2 + (bbox[3] - bbox[1]) // 4
self._draw_circle_osd(frame_meta, batch_meta, cx, cy, 5, 1.0, 0.0, 0.0)
# 构建标签文本:车牌号
if plate_no:
label = f"{plate_no}"
else:
label = f"ID:{g_track_id}"
# 绘制标签(在边界框上方)
self._draw_label_osd(frame_meta, batch_meta, bbox[0], bbox[1] - 25, label)
except Exception as e:
logger.error(f"车辆可视化失败: {e}")
logger.error(traceback.format_exc())
def _image_save_probe(self, pad, info, u_data):
"""
图片保存分支的 Probe
从 tee 分支获取帧,检查是否有待保存的图片任务
"""
try:
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK
# 获取 batch metadata
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
# 遍历每一帧
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
camera_id = frame_meta.source_id
# ✅ 检查是否有图片保存标志
should_save = False
with self.image_save_lock:
if camera_id in self.image_save_flags:
should_save = True
# 清除标志(避免重复保存)
del self.image_save_flags[camera_id]
logger.debug(f"[ImageSave] 检测到保存标志: camera_id={camera_id}")
# 只有在设置了标志时才保存图片
if not should_save:
l_frame = l_frame.next
continue
# 获取帧数据(转换为 numpy 数组)
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
if n_frame is None:
logger.warning(f"[ImageSave] 无法获取帧数据: camera_id={camera_id}")
l_frame = l_frame.next
continue
# 转换为 numpy 数组(BGRx 格式)
frame_image = np.array(n_frame, copy=True, order='C')
# 转换为 BGR 格式(OpenCV 使用)
frame_bgr = cv2.cvtColor(frame_image, cv2.COLOR_RGBA2BGR)
# ✅ 调用图片绘制和保存方法(直接从缓存读取最新数据)
self._smooth_car_visualization_image(frame_bgr, camera_id)
l_frame = l_frame.next
except StopIteration:
break
except Exception as e:
logger.error(f"[ImageSave] 处理帧异常: {e}")
logger.error(traceback.format_exc())
try:
l_frame = l_frame.next
except:
break
return Gst.PadProbeReturn.OK
except Exception as e:
logger.error(f"[ImageSave] Probe异常: {e}")
logger.error(traceback.format_exc())
return Gst.PadProbeReturn.OK
def _smooth_car_visualization_image(self, frame_bgr, camera_id):
"""
图片保存方法 - 用 OpenCV 绘制并保存每辆车的图片
逻辑:
1. 从内存缓存获取该摄像头最新更新的车辆
2. 为每辆车单独绘制一张图片(带边界框、车牌号、时间戳)
3. 保存到指定目录
"""
try:
import cv2
import os
from datetime import datetime
from core.cache.vehicle_track_cache import VehicleTrackCacheManager
# ========== 从内存缓存获取该摄像头的所有车辆 ==========
# 🎯 使用 car_latest=1 的数据(包含 feature,用于图片保存)
track_cache_mgr = VehicleTrackCacheManager.get_instance()
cached_vehicles = track_cache_mgr.search_by_camera(camera_id, car_latest=1)
if not cached_vehicles:
return
# ========== 获取该摄像头上一次的保存时间 ==========
prev_save_time = self._camera_last_save_time.get(camera_id, 0)
# ========== 找到所有车辆中最新的 last_update_time ==========
latest_time = 0
for v in cached_vehicles:
update_time = v.get('last_update_time') or 0
if update_time > latest_time:
latest_time = update_time
if latest_time == 0:
return
# ========== 更新该摄像头的上次保存时间 ==========
self._camera_last_save_time[camera_id] = latest_time
# ========== 获取离上次保存时间最近的所有车辆 ==========
if prev_save_time == 0:
# 第一次:获取最新时间的所有车辆
valid_vehicles = [v for v in cached_vehicles
if abs((v.get('last_update_time') or 0) - latest_time) <= 0.1]
else:
# 获取 >= 上次时间的所有车辆(新更新的)
valid_vehicles = [v for v in cached_vehicles
if (v.get('last_update_time') or 0) >= prev_save_time - 0.1]
if not valid_vehicles:
return
# ========== 创建保存目录(按年/月/日结构) ==========
now = datetime.now()
year = now.strftime("%Y")
month = now.strftime("%m")
day = now.strftime("%d")
# agent/car_detail/2026/01/27/
base_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "car_detail")
save_dir = os.path.join(base_dir, year, month, day)
os.makedirs(save_dir, exist_ok=True)
# ========== 为每辆车绘制并保存一张图片 ==========
for idx, vehicle in enumerate(valid_vehicles):
bbox = vehicle.get('bbox') or vehicle.get('last_position')
if not bbox or len(bbox) < 4:
continue
# 复制原始帧(每辆车一张独立的图片)
frame_copy = frame_bgr.copy()
# 获取车辆信息
plate_no = vehicle.get('vehicle_plate_no', '')
g_track_id = vehicle.get('g_track_id', 0)
# 绘制边界框(青色)
x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
cv2.rectangle(frame_copy, (x1, y1), (x2, y2), (255, 255, 0), 2) # BGR: 青色
# 绘制判定点(红色圆点)
cx = (x1 + x2) // 2
cy = (y1 + y2) // 2 + (y2 - y1) // 4
cv2.circle(frame_copy, (cx, cy), 5, (0, 0, 255), -1) # BGR: 红色
# 构建标签文本:车牌号
if plate_no:
label = f"{plate_no}"
else:
label = f"ID:{g_track_id}"
# 绘制标签背景(黑色半透明)
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(frame_copy, (x1, y1 - 25), (x1 + label_size[0], y1), (0, 0, 0), -1)
# 绘制标签文字(白色)
cv2.putText(frame_copy, label, (x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# 绘制时间戳(左上角)
timestamp_text = now.strftime("%Y-%m-%d %H:%M:%S")
cv2.putText(frame_copy, timestamp_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# ========== 构建文件名:时间_车牌号.jpg ==========
time_str = now.strftime("%Y%m%d%H%M%S") # 20260127173045
if plate_no:
filename = f"{time_str}_{plate_no}.jpg"
else:
filename = f"{time_str}_ID{g_track_id}.jpg"
filepath = os.path.join(save_dir, filename)
# 保存图片
cv2.imwrite(filepath, frame_copy)
logger.info(f"[ImageSave] 保存车辆图片: {filepath}")
except Exception as e:
logger.error(f"[ImageSave] 图片保存失败: camera_id={camera_id}, error={e}")
logger.error(traceback.format_exc())
# 本方法已经废弃
def _sync_visualization(self, frame_meta, batch_meta, detections, camera_id):
"""
同步可视化(演示模式)- 绘制到OSD(已弃用,使用 _smooth_visualization)
本方法已经废弃
"""
try:
# 获取摄像头配置
from core.rule.config_manager import ConfigManager
from core.cache.state_cache import get_state_cache
config_manager = ConfigManager.get_instance()
state_cache = get_state_cache()
camera_config = config_manager.get_camera(camera_id)
if not camera_config:
logger.warning(f"未找到摄像头 {camera_id} 的配置")
return
# 获取帧尺寸
frame_width = g_frame_width
frame_height = g_frame_height
# 场景1:车位检测场景
if camera_config.park_check_tag == 1 and hasattr(camera_config, 'spots') and camera_config.spots:
self._draw_parking_spots(frame_meta, batch_meta, camera_id, camera_config, state_cache, frame_width,
frame_height)
# 场景2:排队监测场景
if camera_config.queue_check_tag == 1:
self._draw_queue_monitoring(frame_meta, batch_meta, camera_id, camera_config, state_cache, detections,
frame_width, frame_height)
# 场景3:人员聚集检测场景
if camera_config.crowd_check_tag == 1:
self._draw_crowd_monitoring(frame_meta, batch_meta, camera_id, camera_config, state_cache, detections,
frame_width, frame_height)
# 绘制过滤后的检测结果(只显示车和人)
# self._draw_filtered_detections(frame_meta, batch_meta, detections)
except Exception as e:
logger.error(f"同步可视化失败: {e}")
logger.error(traceback.format_exc())
def _draw_parking_spots(self):
print("绘制车辆的计算")
def _draw_parking_spots(self, frame_meta, batch_meta, camera_id, camera_config, state_cache, frame_width,
frame_height):
"""绘制停车位"""
try:
parking_spot_states = state_cache.get_spot_states_by_camera_id(camera_id)
for spot_id, spot_config in camera_config.spots.items():
if hasattr(spot_config, 'coordinates') and spot_config.coordinates:
# 转换百分比坐标为像素坐标
pixel_coords = []
for point in spot_config.coordinates:
if isinstance(point, dict) and 'x' in point and 'y' in point:
pixel_x = int((point['x'] / 100) * frame_width)
pixel_y = int((point['y'] / 100) * frame_height)
pixel_coords.append([pixel_x, pixel_y])
elif isinstance(point, list) and len(point) >= 2:
pixel_x = int((point[0] / 100) * frame_width)
pixel_y = int((point[1] / 100) * frame_height)
pixel_coords.append([pixel_x, pixel_y])
if len(pixel_coords) >= 3: # 至少需要3个点构成多边形
# 获取车位状态
current_spot_status = "vacant" # 默认空闲
spot_state = parking_spot_states.get(spot_id)
if spot_state and 'current_status' in spot_state:
current_spot_status = spot_state['current_status']
# 设置颜色:占用为红色,空闲为绿色
if current_spot_status == "occupied":
color_r, color_g, color_b = 1.0, 0.0, 0.0 # 红色
else:
color_r, color_g, color_b = 0.0, 1.0, 0.0 # 绿色
# 使用DeepStream的多边形绘制
self._draw_polygon_osd(frame_meta, batch_meta, pixel_coords, color_r, color_g, color_b,
alpha=0.5)
except Exception as e:
logger.error(f"绘制停车位失败: {e}")
def _draw_queue_monitoring(self, frame_meta, batch_meta, camera_id, camera_config, state_cache, detections,
frame_width, frame_height):
"""绘制排队监测信息"""
try:
# 在左下角显示排队状态信息
status_lines = []
if hasattr(camera_config, 'queue_areas') and camera_config.queue_areas:
for area_id, area_config in camera_config.queue_areas.items():
# 获取排队区域状态
area_state = state_cache.get_queue_area_state(area_id)
vehicle_count = 0
if area_state and 'count' in area_state:
vehicle_count = area_state['count']
# 确定拥堵状态
if vehicle_count <= 5:
status = "畅通"
elif vehicle_count <= 10:
status = "轻度拥堵"
else:
status = "严重拥堵"
status_lines.append(f"{area_id}: {vehicle_count}辆 ({status})")
# 如果没有配置排队区域,显示总检测车辆数
if not status_lines:
total_vehicles = len(detections)
status_lines.append(f"检测车辆: {total_vehicles}辆")
# 在左下角绘制状态文本
self._draw_text_osd(frame_meta, batch_meta, status_lines, frame_width, frame_height)
except Exception as e:
logger.error(f"绘制排队监测信息失败: {e}")
def _draw_vehicle_detections(self, frame_meta, batch_meta, detections):
"""绘制车辆检测结果"""
try:
for detection in detections:
bbox = detection['bbox']
confidence = detection.get('confidence', 0.0)
class_id = detection.get('class_id', 0)
# 绘制边界框
# self._draw_rectangle_osd(frame_meta, batch_meta, bbox, 0.0, 1.0, 1.0) # 青色边界框
# 绘制判定点
cx = (bbox[0] + bbox[2]) // 2
cy = (bbox[1] + bbox[3]) // 2 + (bbox[3] - bbox[1]) // 4
self._draw_circle_osd(frame_meta, batch_meta, cx, cy, 5, 1.0, 0.0, 0.0) # 红色判定点
# 绘制标签(包含追踪ID)
object_id = detection.get('object_id', 0)
is_tracked = detection.get('is_tracked', False)
if is_tracked:
label = f"ID:{object_id} {class_id} {confidence:.2f}"
else:
label = f"{class_id} {confidence:.2f}"
# label = f"{class_id} {confidence:.2f}"
self._draw_label_osd(frame_meta, batch_meta, bbox[0], bbox[1] - 10, label)
except Exception as e:
logger.error(f"绘制车辆检测结果失败: {e}")
def _draw_polygon_osd(self, frame_meta, batch_meta, pixel_coords, color_r, color_g, color_b, alpha=0.5):
"""使用DeepStream OSD绘制多边形"""
try:
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
# 绘制多边形边缘线条
if len(pixel_coords) >= 2:
for i in range(len(pixel_coords)):
next_i = (i + 1) % len(pixel_coords)
line_params = display_meta.line_params[display_meta.num_lines]
line_params.x1 = pixel_coords[i][0]
line_params.y1 = pixel_coords[i][1]
line_params.x2 = pixel_coords[next_i][0]
line_params.y2 = pixel_coords[next_i][1]
line_params.line_width = 5
line_params.line_color.red = color_r
line_params.line_color.green = color_g
line_params.line_color.blue = color_b
line_params.line_color.alpha = alpha
display_meta.num_lines += 1
if display_meta.num_lines >= 16: # DeepStream限制
break
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
logger.error(f"绘制多边形失败: {e}")
def _draw_rectangle_osd(self, frame_meta, batch_meta, bbox, color_r, color_g, color_b):
"""使用DeepStream OSD绘制矩形"""
try:
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
rect_params = display_meta.rect_params[0]
rect_params.left = bbox[0]
rect_params.top = bbox[1]
rect_params.width = bbox[2] - bbox[0]
rect_params.height = bbox[3] - bbox[1]
rect_params.border_width = 2
rect_params.border_color.red = color_r
rect_params.border_color.green = color_g
rect_params.border_color.blue = color_b
rect_params.border_color.alpha = 0.5
rect_params.has_bg_color = 0
display_meta.num_rects = 1
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
logger.error(f"绘制矩形失败: {e}")
def _draw_circle_osd(self, frame_meta, batch_meta, cx, cy, radius, color_r, color_g, color_b):
"""使用DeepStream OSD绘制圆形"""
try:
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
circle_params = display_meta.circle_params[0]
circle_params.xc = cx
circle_params.yc = cy
circle_params.radius = radius
circle_params.circle_color.red = color_r
circle_params.circle_color.green = color_g
circle_params.circle_color.blue = color_b
circle_params.circle_color.alpha = 0.5
circle_params.has_bg_color = 0
display_meta.num_circles = 1
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
logger.error(f"绘制圆形失败: {e}")
def _draw_label_osd(self, frame_meta, batch_meta, x, y, text):
"""使用DeepStream OSD绘制文本标签"""
try:
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
text_params = display_meta.text_params[0]
text_params.display_text = text
text_params.x_offset = x
text_params.y_offset = y
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 12
text_params.font_params.font_color.red = 1.0
text_params.font_params.font_color.green = 1.0
text_params.font_params.font_color.blue = 1.0
text_params.font_params.font_color.alpha = 1.0
text_params.set_bg_clr = 1
text_params.text_bg_clr.red = 0.0
text_params.text_bg_clr.green = 0.0
text_params.text_bg_clr.blue = 0.0
text_params.text_bg_clr.alpha = 0.5
display_meta.num_labels = 1
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
logger.error(f"绘制文本标签失败: {e}")
def _draw_text_osd(self, frame_meta, batch_meta, text_lines, frame_width, frame_height):
"""在左下角绘制多行状态文本"""
try:
# 计算左下角位置
start_x = 20
start_y = frame_height - 20 - (len(text_lines) * 25) # 每行25像素高度
for i, text in enumerate(text_lines):
y_pos = start_y + (i * 25)
self._draw_label_osd(frame_meta, batch_meta, start_x, y_pos, text)
except Exception as e:
logger.error(f"绘制状态文本失败: {e}")
def _draw_crowd_monitoring_smooth(self, frame_meta, batch_meta, camera_id, camera_config,
state_cache, detections, frame_width, frame_height):
"""
平滑绘制人员聚集监测可视化 - 基于缓存数据,每帧绘制,避免闪烁
可视化效果:
1. 默认人员:红色边界框
2. 聚集状态人员:蓝色边界框
3. 检测到聚集(未确认):紫色边界框
4. 确认聚集状态:绿色边界框
5. 聚集区域信息:显示人数和状态
"""
try:
# 1. 过滤人员检测结果
person_detections = [d for d in detections if d.get('class_name') == 'person']
# 2. 获取聚集可视化数据(基于缓存,不依赖当前帧的检测结果)
crowd_vis_data = {}
if hasattr(camera_config, 'crowd_areas') and camera_config.crowd_areas:
for area_id in camera_config.crowd_areas.keys():
cache_key = f"crowd_vis_{camera_id}_{area_id}"
vis_data = state_cache.get_state(cache_key)
if vis_data:
# 检查数据是否过期(超过5秒的数据不显示)
current_time = time.time()
data_time = vis_data.get('timestamp', 0)
if current_time - data_time <= 5.0: # 5秒内的数据有效
crowd_vis_data[area_id] = vis_data
# 3. 绘制人员边界框 - 补充缺失的蓝色标注逻辑
for person in person_detections:
# 判断人员是否在聚集区域内
is_in_crowd = self._is_person_in_crowd_region(person, crowd_vis_data)
if is_in_crowd:
# 聚集状态:蓝色边界框
self._draw_rectangle_osd(frame_meta, batch_meta, person['bbox'], 0.0, 0.0, 1.0)
else:
# 正常状态:红色边界框
self._draw_rectangle_osd(frame_meta, batch_meta, person['bbox'], 1.0, 0.0, 0.0)
# 4. 绘制聚集区域边界和信息
for area_id, vis_data in crowd_vis_data.items():
if vis_data.get('crowd_region_bbox'):
crowd_bbox = vis_data['crowd_region_bbox']
person_count = vis_data.get('person_count', 0)
is_confirmed = vis_data.get('confirmed_crowded', False) # 是否已确认聚集
max_density = vis_data.get('max_density', 0.0)
if is_confirmed:
# 确认聚集状态:绿色边界框
self._draw_rectangle_osd(frame_meta, batch_meta, crowd_bbox, 0.0, 1.0, 0.0)
label = f"crowed: {person_count} density:{max_density:.1f}"
else:
# 检测到聚集但未确认:紫色边界框
self._draw_rectangle_osd(frame_meta, batch_meta, crowd_bbox, 0.5, 0.0, 1.0)
label = f"crow: {person_count} density:{max_density:.1f}"
# 在聚集区域上方显示人数信息
self._draw_label_osd(frame_meta, batch_meta, crowd_bbox[0], crowd_bbox[1] - 25, label)
except Exception as e:
logger.error(f"平滑绘制人员聚集监测失败: {e}")
# 废弃
def _draw_crowd_monitoring(self, frame_meta, batch_meta, camera_id, camera_config,
state_cache, detections, frame_width, frame_height):
"""
绘制人员聚集监测可视化(已弃用,使用 _draw_crowd_monitoring_smooth )
可视化效果:
1. 默认人员:红色边界框
2. 聚集状态人员:蓝色边界框
3. 检测到聚集(未确认):紫色边界框
4. 确认聚集状态:绿色边界框
"""
try:
# 1. 过滤人员检测结果
person_detections = [d for d in detections if d.get('class_name') == 'person']
# 2. 获取聚集可视化数据
crowd_vis_data = {}
if hasattr(camera_config, 'crowd_areas') and camera_config.crowd_areas:
for area_id in camera_config.crowd_areas.keys():
cache_key = f"crowd_vis_{camera_id}_{area_id}"
vis_data = state_cache.get_state(cache_key)
if vis_data:
crowd_vis_data[area_id] = vis_data
# 3. 绘制人员边界框
for person in person_detections:
# 判断人员是否在聚集区域内
is_in_crowd = self._is_person_in_crowd_region(person, crowd_vis_data)
if is_in_crowd:
# 聚集状态:蓝色边界框
self._draw_rectangle_osd(frame_meta, batch_meta, person['bbox'], 0.0, 0.0, 1.0)
else:
# 正常状态:红色边界框
self._draw_rectangle_osd(frame_meta, batch_meta, person['bbox'], 1.0, 0.0, 0.0)
# 4. 绘制聚集区域边界
for area_id, vis_data in crowd_vis_data.items():
if vis_data.get('crowd_region_bbox'):
crowd_bbox = vis_data['crowd_region_bbox']
person_count = vis_data.get('person_count', 0)
is_confirmed = vis_data.get('confirmed_crowded', False) # 是否已确认聚集
if is_confirmed:
# 确认聚集状态:绿色边界框
self._draw_rectangle_osd(frame_meta, batch_meta, crowd_bbox, 0.0, 1.0, 0.0)
label = f"确认聚集: {person_count}人"
else:
# 检测到聚集但未确认:紫色边界框
self._draw_rectangle_osd(frame_meta, batch_meta, crowd_bbox, 0.5, 0.0, 1.0)
label = f"检测聚集: {person_count}人"
# 在聚集区域上方显示人数信息
self._draw_label_osd(frame_meta, batch_meta, crowd_bbox[0], crowd_bbox[1] - 25, label)
except Exception as e:
logger.error(f"绘制人员聚集监测失败: {e}")
def _is_person_in_crowd_region(self, person: Dict, crowd_vis_data: Dict) -> bool:
"""判断人员是否在聚集区域内"""
try:
person_bbox = person['bbox']
person_center_x = (person_bbox[0] + person_bbox[2]) / 2
person_center_y = (person_bbox[1] + person_bbox[3]) / 2
for area_id, vis_data in crowd_vis_data.items():
if vis_data.get('crowd_detected') and vis_data.get('crowd_region_bbox'):
crowd_bbox = vis_data['crowd_region_bbox']
# 检查人员中心点是否在聚集区域内
if (crowd_bbox[0] <= person_center_x <= crowd_bbox[2] and
crowd_bbox[1] <= person_center_y <= crowd_bbox[3]):
return True
return False
except Exception as e:
logger.error(f"判断人员是否在聚集区域失败: {e}")
return False
def _async_inference_callback(self, camera_id, detections, frame_meta_dict):
"""异步执行推理回调"""
try:
if self.inference_callback and self.running and self.thread_pool:
# 检查线程池是否已关闭
if not self.thread_pool._shutdown:
self.thread_pool.submit(
self._safe_inference_callback,
camera_id, detections, frame_meta_dict
)
else:
logger.debug(f"线程池已关闭,跳过推理回调: {camera_id}")
except RuntimeError as e:
if "cannot schedule new futures after interpreter shutdown" in str(e):
logger.debug("解释器正在关闭,跳过推理回调")
# 设置标志防止后续提交
self.running = False
else:
logger.error(f"提交异步推理回调失败: {e}")
except Exception as e:
logger.error(f"提交异步推理回调失败: {e}")
def _safe_inference_callback(self, camera_id, detections, frame_meta_dict):
"""安全的推理回调执行"""
try:
self.inference_callback(camera_id, detections, frame_meta_dict)
except Exception as e:
logger.error(f"推理回调执行失败: {e}")
logger.error(f"回调异常详情: {traceback.format_exc()}")
def _bus_call(self, bus, message, loop):
"""处理bus消息"""
t = message.type
if t == Gst.MessageType.EOS:
logger.info("Pipeline End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
logger.warning(f"Pipeline Warning: {err}: {debug}")
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
error_message = str(err)
logger.error(f"Pipeline Error: {err}: {debug}")
# 分析错误类型,决定是否需要退出
should_exit = self._analyze_error_severity(error_message, debug)
if should_exit:
# 记录严重错误到日志,便于Docker看门狗检测
logger.error("Pipeline Error 严重错误,Agent即将退出 - Docker将重启容器")
logger.error(f"错误详情: {error_message}")
logger.error(f"调试信息: {debug}")
sys.exit(1) # 直接退出,由Docker重启机制处理
else:
logger.warning("Pipeline Error 非严重错误,继续运行")
# 可以在这里添加错误恢复逻辑
self._handle_recoverable_error(error_message, debug)
elif t == Gst.MessageType.ELEMENT:
# 🎯 处理帧差法运动检测消息
struct = message.get_structure()
if struct and struct.get_name() == "framediff-motion":
self._handle_framediff_motion(struct)
return True
def _handle_framediff_motion(self, struct):
"""
处理帧差法运动检测消息
消息结构:
- frame-number: 帧编号
- motion-pixels: 运动像素数
- total-pixels: 总像素数
- motion-ratio: 运动比例
- has-motion: 是否检测到运动
"""
try:
frame_number = struct.get_uint64("frame-number")[1]
motion_pixels = struct.get_uint64("motion-pixels")[1]
total_pixels = struct.get_uint64("total-pixels")[1]
motion_ratio = struct.get_double("motion-ratio")[1]
has_motion = struct.get_boolean("has-motion")[1]
# 调试输出(每 30 帧输出一次)
if frame_number % 30 == 0:
logger.info(
f"[FrameDiff] Frame#{frame_number}: "
f"{motion_pixels}/{total_pixels} ({motion_ratio*100:.2f}%) "
f"{'🟢 MOTION' if has_motion else '⚪ STILL'}"
)
# 可以在这里添加运动检测的业务逻辑
# 例如:只在检测到运动时才进行 YOLO 推理
# 或者:记录运动事件到数据库
except Exception as e:
logger.warning(f"处理帧差法消息失败: {e}")
def _analyze_error_severity(self, error_message: str, debug_info: str) -> bool:
"""
分析错误严重程度,决定是否需要退出程序 (适配Docker环境)
Args:
error_message: 错误消息
debug_info: 调试信息
Returns:
bool: True 表示需要退出,False 表示可以继续运行
"""
# 将错误消息转换为小写以便匹配
error_lower = error_message.lower()
debug_lower = debug_info.lower() if debug_info else ""
# 定义非严重错误模式(这些错误不应导致整个程序退出)
recoverable_errors = [
"could not open resource for reading and writing", # RTSP 连接失败
"timeout while waiting for server response", # RTSP 超时
"failed to connect", # 连接失败
"connection refused", # 连接被拒绝
"network unreachable", # 网络不可达
"host unreachable", # 主机不可达
"rtsp source error", # RTSP 源错误
"gst_rtspsrc_retrieve_sdp", # RTSP SDP 获取失败
"resource busy", # 资源忙
"buffering", # 缓冲问题
]
# 检查是否为可恢复错误
for pattern in recoverable_errors:
if pattern in error_lower or pattern in debug_lower:
logger.info(f"检测到可恢复错误模式: {pattern}")
return False
# 定义严重错误模式(这些错误需要退出程序,触发Docker重启)
critical_errors = [
"out of memory", # 内存不足
"cuda error", # CUDA 错误
"gpu error", # GPU 错误
"inference engine error", # 推理引擎错误
"pipeline creation failed", # Pipeline 创建失败
"element linking failed", # 元素链接失败
"segmentation fault", # 段错误
"core dumped", # 核心转储
"nvstreammux", # StreamMux错误
"nvinfer", # 推理错误
"nvtracker", # 追踪器错误
"gstreamer-critical", # GStreamer严重错误
"tensorrt", # TensorRT错误
"deepstream", # DeepStream错误
]
# 检查是否为严重错误
for pattern in critical_errors:
if pattern in error_lower or pattern in debug_lower:
logger.error(f"检测到严重错误模式: {pattern} - 将触发容器重启")
return True
# 连续错误检测:如果短时间内出现大量错误,也认为是严重问题
self._track_error_frequency(error_message)
if self._is_error_frequency_critical():
logger.error("错误频率过高,认定为严重故障 - 将触发容器重启")
return True
# 默认情况:如果无法确定错误类型,暂时不退出,记录警告
logger.warning(f"未知错误类型,暂不退出: {error_message}")
return False
def _track_error_frequency(self, error_message: str):
"""
跟踪错误发生频率 (Docker环境故障检测)
Args:
error_message: 错误消息
"""
try:
current_time = time.time()
self.error_timestamps.append(current_time)
# 清理过期的错误记录(超过时间窗口的)
cutoff_time = current_time - self.error_frequency_window
self.error_timestamps = [ts for ts in self.error_timestamps if ts >= cutoff_time]
except Exception as e:
logger.error(f"跟踪错误频率时出现异常: {e}")
def _is_error_frequency_critical(self) -> bool:
"""
检查错误频率是否达到严重级别 (Docker环境故障检测)
Returns:
bool: True 表示错误频率过高,False 表示正常
"""
try:
current_time = time.time()
cutoff_time = current_time - self.error_frequency_window
# 统计时间窗口内的错误数量
recent_errors = [ts for ts in self.error_timestamps if ts >= cutoff_time]
error_count = len(recent_errors)
if error_count >= self.critical_error_threshold:
logger.warning(f"错误频率过高: {error_count}个错误在{self.error_frequency_window}秒内")
return True
return False
except Exception as e:
logger.error(f"检查错误频率时出现异常: {e}")
return False
def _handle_recoverable_error(self, error_message: str, debug_info: str):
"""
处理可恢复的错误
Args:
error_message: 错误消息
debug_info: 调试信息
"""
try:
# 尝试提取出错的摄像头ID
failed_camera_id = self._extract_camera_id_from_error(debug_info)
if failed_camera_id:
logger.warning(f"摄像头 {failed_camera_id} 出现错误,将在稍后重试连接")
# 这里可以添加重连逻辑或错误计数
# 暂时只记录错误,不做处理
else:
logger.warning("无法确定出错的摄像头,错误已记录")
except Exception as e:
logger.error(f"处理可恢复错误时出现异常: {e}")
def _extract_camera_id_from_error(self, debug_info: str) -> Optional[str]:
"""
从错误调试信息中提取摄像头ID
Args:
debug_info: 调试信息
Returns:
Optional[str]: 摄像头ID,如果无法提取则返回None
"""
try:
import re
# 尝试匹配 source-bin-{camera_id} 模式
pattern = r'source-bin-([^/\s]+)'
match = re.search(pattern, debug_info)
if match:
camera_id = match.group(1)
logger.debug(f"从错误信息中提取到摄像头ID: {camera_id}")
return camera_id
return None
except Exception as e:
logger.error(f"提取摄像头ID时出现异常: {e}")
return None
def start(self):
"""启动pipeline"""
if self.running:
logger.warning("Pipeline已在运行")
return
logger.info("启动多输出pipeline")
def run_pipeline():
try:
self.create_pipeline()
# 设置bus
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.loop = GLib.MainLoop()
self.bus.connect("message", self._bus_call, self.loop)
# 启动pipeline
ret = self.pipeline.set_state(Gst.State.PAUSED)
if ret == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("无法设置pipeline为PAUSED状态")
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("无法设置pipeline为PLAYING状态")
self.running = True
self._stats_last_report_time = time.time()
self._stats_frames_since_report = 0
logger.info("多输出Pipeline启动成功")
# 运行主循环
self.loop.run()
except Exception as e:
logger.error(f"Pipeline运行异常: {e}")
logger.error(traceback.format_exc())
sys.exit(1) # 直接退出,由watchdog重启
finally:
self.running = False
logger.info("Pipeline线程结束")
sys.exit(1) # 直接退出,由watchdog重启
self.thread = threading.Thread(target=run_pipeline, daemon=True)
self.thread.start()
# 等待pipeline启动
time.sleep(3)
def stop(self):
"""停止pipeline"""
if not self.running:
return
logger.info("停止pipeline")
self.running = False # 先设置运行标志为False,防止新的回调提交
try:
# 停止pipeline
if self.pipeline:
logger.debug("停止pipeline")
self.pipeline.set_state(Gst.State.NULL)
# 退出主循环
if self.loop:
logger.debug("退出主循环")
self.loop.quit()
# 注意:不关闭线程池,让它保持运行状态
# 线程池只在进程完全退出时才自动关闭
except Exception as e:
logger.error(f"停止pipeline异常: {e}")
logger.error(f"停止异常详情: {traceback.format_exc()}")
logger.info("Pipeline停止完成")
class DeepStreamPipelineManager:
“”"
DeepStream Pipeline 管理器 (重构版)
基于单Pipeline多输出架构
“”"
def __init__(self, frame_diff_threshold=30, frame_diff_iou=0.3, debug_image=False):
"""
初始化 DeepStream Pipeline 管理器
Args:
frame_diff_threshold: 帧差法阈值 (默认30)
frame_diff_iou: 帧差法交集阈值 (默认0.3)
debug_image: 是否启用图片保存调试 (默认False)
"""
self.current_pipeline = None
self.active_cameras = {} # {camera_id: camera_vo}
self.inference_callback = None
self.vis_callback = None
self.lock = threading.Lock()
# 保存帧差法参数
self.frame_diff_threshold = frame_diff_threshold
self.frame_diff_iou = frame_diff_iou
**
**针对deepstream-nvdsanalytics ** 蛀