################################################################################
nvdsanalytics 配置文件
ROI 区域过滤配置
################################################################################
[property]
enable=1
配置分辨率(必须与 streammux 一致!)
streammux 实际分辨率是 1920x1080(从 deepstream_pipeline_manager.py 第 1127-1128 行)
config-width=1280
config-height=720
osd-mode: 0=不显示, 1=显示线条和ROI, 2=显示所有信息
osd-mode=0
display-font-size=12
流 0 - 第一个摄像头的 ROI 配置
[roi-filtering-stream-0]
enable=1
ROI 区域坐标:左侧区域(可根据需要调整)
坐标:(0,0) → (960,0) → (960,1080) → (0,1080)
这定义了画面左半部分作为 ROI
roi-RF=0;200;750;200;0;720;750;720
inverse-roi=0 表示保留 ROI 内的对象,=1 表示保留 ROI 外的对象
inverse-roi=0
class-id=-1 表示所有类别,可指定特定类别如 class-id=0
class-id=-1 这里是ROI的配置。。。 def create_pipeline(self):
“”“创建单GPU的Pipeline,所有组件使用同一个GPU”“”
logger.info(f"[GPU {self.gpu_id}] 创建Pipeline,流数: {len(self.active_cameras)}")
self.pipeline = Gst.Pipeline()
# ====== 输入部分:多路RTSP源 ======
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
if not streammux:
raise RuntimeError("无法创建 nvstreammux 元素")
# 对性能影响很重要的参数,设置为 8
streammux.set_property('batch-size', len(self.active_cameras))
# streammux.set_property('width', 1920)
# streammux.set_property('height', 1080)
# 归一化处理,适配大部分场景
streammux.set_property('width', g_frame_width)
streammux.set_property('height', g_frame_height)
streammux.set_property('enable-padding', 1) # 保持长宽比
# 25 fps
streammux.set_property('batched-push-timeout', 40000)
streammux.set_property('live-source', 1)
# 🎯 使用该Pipeline指定的GPU
streammux.set_property('gpu-id', self.gpu_id)
streammux.set_property('nvbuf-memory-type', 0)
# 🔧 FIX: 防止单个源EOS导致整个pipeline结束
try:
streammux.set_property('drop-pipeline-eos', True)
logger.info(f"[GPU {self.gpu_id}] nvstreammux 已启用 drop-pipeline-eos,忽略单源EOS")
except Exception as e:
logger.warning(f"[GPU {self.gpu_id}] 无法设置drop-pipeline-eos: {e}")
# 🔧 FIX: 对于实时RTSP流,不同步输入以提高稳定性
try:
streammux.set_property('sync-inputs', 0)
logger.info(f"[GPU {self.gpu_id}] nvstreammux 已禁用sync-inputs,提高RTSP稳定性")
except Exception as e:
logger.warning(f"[GPU {self.gpu_id}] 无法设置sync-inputs: {e}")
self.pipeline.add(streammux)
logger.info(f"[GPU {self.gpu_id}] nvstreammux 使用 GPU {self.gpu_id} (已应用EOS防护)")
# 添加source bins
for camera_id, camera_vo in self.active_cameras.items():
sink_idx = self.stream_index_map[camera_id]
src_bin = self._create_source_bin(camera_id, camera_vo.rtsp_url, self.gpu_id)
self.pipeline.add(src_bin)
sinkpad = streammux.request_pad_simple(f"sink_{sink_idx}")
srcpad = src_bin.get_static_pad("src")
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
raise RuntimeError(f"无法连接 {camera_id} 到 streammux")
logger.info(f"[Pipeline] 链接流 {camera_id} 到 sink_{sink_idx}")
# # ====== 推理部分:批处理推理 ======
# queue1 = Gst.ElementFactory.make("queue", "queue1")
# pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
# if not pgie:
# raise RuntimeError("无法创建 nvinfer 元素")
#
# # 🎯 使用该GPU对应的配置文件
# yolov8_config = get_yolov8_config_path(self.gpu_id)
# pgie.set_property('config-file-path', yolov8_config)
# ====== 创建帧差法元素(在YOLO前进行运动检测) ======
logger.info("=" * 80)
logger.info("🎬 开始创建帧差法运动检测元素...")
logger.info("=" * 80)
# 🎯 加载本地的帧差法插件(不需要安装到系统)
framediff_plugin_path = "/sgx/prod/eStellar/DeepStream-Yolo/frame_diff"
framediff_so_path = os.path.join(framediff_plugin_path, "libgstframediff.so")
logger.info(f"📂 检查帧差法插件路径: {framediff_plugin_path}")
if not os.path.exists(framediff_plugin_path):
logger.error(f"❌ 帧差法插件目录不存在: {framediff_plugin_path}")
self.framediff = None
elif not os.path.exists(framediff_so_path):
logger.error(f"❌ 帧差法插件文件不存在: {framediff_so_path}")
logger.error(f" 请先编译插件: cd {framediff_plugin_path} && make")
self.framediff = None
else:
logger.info(f"✅ 找到帧差法插件: {framediff_so_path}")
logger.info(f" 文件大小: {os.path.getsize(framediff_so_path) / 1024:.1f} KB")
# 加载插件
logger.info(f"🔧 正在加载帧差法插件...")
registry = Gst.Registry.get()
scan_result = registry.scan_path(framediff_plugin_path)
logger.info(f" 扫描结果: {scan_result}")
# 尝试创建元素
logger.info(f"🔧 尝试创建 framediff 元素...")
self.framediff = Gst.ElementFactory.make("framediff", "framediff")
if not self.framediff:
logger.error("❌ 无法创建 framediff 元素!")
logger.error(" 可能原因:")
logger.error(" 1. 插件未正确编译")
logger.error(" 2. 缺少依赖库(CUDA、DeepStream)")
logger.error(" 3. GStreamer 无法识别插件")
logger.error(f" 编译命令: cd {framediff_plugin_path} && make")
logger.error(" 调试命令: gst-inspect-1.0 framediff")
self.framediff = None
else:
logger.info("✅ framediff 元素创建成功!")
# 🎯 配置帧差法参数(与 YOLO 同步:每 25 帧执行一次)
logger.info("🔧 配置帧差法参数...")
self.framediff.set_property("diff-threshold", 30) # 像素差异阈值 (0-255)
self.framediff.set_property("motion-threshold", 0.01) # 运动比例阈值 (0.0-1.0)
self.framediff.set_property("enable-debug", True) # 🎯 启用调试输出
self.framediff.set_property("gpu-id", self.gpu_id) # GPU ID
self.framediff.set_property("interval", 25) # 🎯 每 25 帧计算一次(与 YOLO 同步)
self.pipeline.add(self.framediff)
logger.info("=" * 80)
logger.info("🎉 帧差法配置完成!")
logger.info(f" GPU ID: {self.gpu_id}")
logger.info(f" 推理间隔: 每 25 帧执行一次 (与 YOLO 同步)")
logger.info(f" 像素差异阈值: 30")
logger.info(f" 运动比例阈值: 0.01 (1%)")
logger.info(f" 调试模式: 已启用")
logger.info("=" * 80)
# 创建YOLO主检测器
print("✓ 创建YOLO主检测器...")
self.pgie = Gst.ElementFactory.make("nvinfer", "pgie")
if not self.pgie:
print("✗ 无法创建YOLO检测器")
return False
self.pgie.set_property("config-file-path", "/sgx/prod/eStellar/agent/config/config_infer_primary_yoloV8.txt")
self.pgie.set_property("batch-size", 6)
self.pgie.set_property("unique-id", 1)
self.pgie.set_property("process-mode", 1)
self.pipeline.add(self.pgie)
print(f" 配置: batch-size={6}, process-mode=1 (Full frame)")
# 创建ReID副检测器
print("✓ 创建ReID副检测器...")
self.sgie = Gst.ElementFactory.make("nvinfer", "sgie")
if not self.sgie:
print("✗ 无法创建ReID检测器")
return False
# 🎯 使用该Pipeline指定的GPU进行推理
self.sgie.set_property("config-file-path", "/sgx/prod/eStellar/agent/config/config_infer_reid.txt")
self.sgie.set_property("batch-size", 6)
self.sgie.set_property("unique-id", 2)
self.sgie.set_property("process-mode", 2)
self.pipeline.add(self.sgie)
print(f" 配置: batch-size=6, process-mode=2 (Object level)")
# 创建其他元素
print("✓ 创建其他处理元素...")
nvconv = Gst.ElementFactory.make("nvvideoconvert", "nvconv")
nvosd = Gst.ElementFactory.make("nvdsosd", "nvosd")
sink = Gst.ElementFactory.make("fakesink", "sink")
for elem in [nvconv, nvosd, sink]:
if not elem:
print("✗ 无法创建元素")
return False
self.pipeline.add(elem)
sink.set_property("sync", False)
# ========== 创建 tee 分支(检测+ReID 和 图片保存) ==========
if self.debug_image:
print("✓ 创建 tee 分支(启用图片保存调试)...")
tee = Gst.ElementFactory.make("tee", "tee")
if not tee:
print("✗ 无法创建 tee 元素")
return False
self.pipeline.add(tee)
# 分支1:检测 + ReID 分支
queue_infer = Gst.ElementFactory.make("queue", "queue_infer")
if not queue_infer:
print("✗ 无法创建 queue_infer")
return False
self.pipeline.add(queue_infer)
# 分支2:图片保存分支
queue_image = Gst.ElementFactory.make("queue", "queue_image")
nvconv_image = Gst.ElementFactory.make("nvvideoconvert", "nvconv_image")
capsfilter_image = Gst.ElementFactory.make("capsfilter", "capsfilter_image")
sink_image = Gst.ElementFactory.make("fakesink", "sink_image")
for elem in [queue_image, nvconv_image, capsfilter_image, sink_image]:
if not elem:
print("✗ 无法创建图片保存分支元素")
return False
self.pipeline.add(elem)
# 设置图片分支的 caps(转换为 BGR 格式供 OpenCV 使用)
caps_image = Gst.Caps.from_string("video/x-raw, format=BGRx")
capsfilter_image.set_property("caps", caps_image)
sink_image.set_property("sync", False)
else:
print("✓ 跳过 tee 分支(图片保存调试已禁用)...")
tee = None
queue_infer = None
# 链接所有元素
print("🔗 链接Pipeline...")
if self.debug_image:
# ========== 启用图片保存调试:使用 tee 分支 ==========
# 根据是否有 framediff 插件决定链接方式
if self.framediff:
# 有帧差法 (CUDA 版本): streammux -> framediff -> tee
link_success = (streammux.link(self.framediff) and
self.framediff.link(tee))
print(" 链路: streammux -> framediff (CUDA) -> tee")
else:
# 无帧差法: streammux -> tee
link_success = streammux.link(tee)
print(" 链路: streammux -> tee")
if not link_success:
print("✗ Pipeline链接到tee失败")
return False
# 链接 tee 的分支1:检测 + ReID
# tee -> queue_infer -> pgie -> sgie -> nvconv -> nvosd -> sink
tee_src_pad_infer = tee.request_pad_simple("src_%u")
queue_infer_sink_pad = queue_infer.get_static_pad("sink")
if tee_src_pad_infer.link(queue_infer_sink_pad) != Gst.PadLinkReturn.OK:
print("✗ 无法链接 tee -> queue_infer")
return False
if not (queue_infer.link(self.pgie) and
self.pgie.link(self.sgie) and
self.sgie.link(nvconv) and
nvconv.link(nvosd) and
nvosd.link(sink)):
print("✗ 检测+ReID分支链接失败")
return False
print(" 分支1: tee -> queue_infer -> pgie -> sgie -> nvconv -> nvosd -> sink")
# 链接 tee 的分支2:图片保存
# tee -> queue_image -> nvconv_image -> capsfilter_image -> sink_image
tee_src_pad_image = tee.request_pad_simple("src_%u")
queue_image_sink_pad = queue_image.get_static_pad("sink")
if tee_src_pad_image.link(queue_image_sink_pad) != Gst.PadLinkReturn.OK:
print("✗ 无法链接 tee -> queue_image")
return False
if not (queue_image.link(nvconv_image) and
nvconv_image.link(capsfilter_image) and
capsfilter_image.link(sink_image)):
print("✗ 图片保存分支链接失败")
return False
print(" 分支2: tee -> queue_image -> nvconv_image -> capsfilter_image -> sink_image")
# 添加图片保存分支的 Probe
sink_image.get_static_pad("sink").add_probe(
Gst.PadProbeType.BUFFER, self._image_save_probe, 0)
else:
# ========== 禁用图片保存调试:直接链接,无 tee 分支 ==========
# 根据是否有 framediff 插件决定链接方式
if self.framediff:
# 有帧差法 (CUDA 版本,直接处理 NVMM buffer): streammux -> framediff -> pgie
link_success = (streammux.link(self.framediff) and
self.framediff.link(self.pgie))
print(" 链路: streammux -> framediff (CUDA) -> pgie -> sgie -> nvconv -> nvosd -> sink")
else:
# 无帧差法: streammux -> pgie
link_success = streammux.link(self.pgie)
print(" 链路: streammux -> pgie -> sgie -> nvconv -> nvosd -> sink")
if not link_success:
print("✗ Pipeline链接失败")
return False
# 直接链接检测和ReID
if not (self.pgie.link(self.sgie) and
self.sgie.link(nvconv) and
nvconv.link(nvosd) and
nvosd.link(sink)):
print("✗ 检测+ReID链接失败")
return False
# 添加Probe
print("📍 添加Probe...\n")
self.pgie.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER,
self.yolo_buffer_probe, 0)
self.sgie.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER,
self.reid_buffer_probe, 0)
# 禁用 deepstream 默认的可视化
# pgie.set_property('process-mode', 1) # 1=PROCESS_MODE_CLIP_OBJECTS
# # ====== 追踪部分:对象追踪(可选) ======
# nvtracker = None
# g_enable_tracking = False
# if g_enable_tracking:
# nvtracker = Gst.ElementFactory.make("nvtracker", "tracker")
# if not nvtracker:
# raise RuntimeError("无法创建 nvtracker 元素")
#
# # 配置追踪器 - 使用更保守的设置以避免内存崩溃
# nvtracker.set_property('tracker-width', 320) # 减少分辨率以降低内存使用
# nvtracker.set_property('tracker-height', 192) # 减少分辨率以降低内存使用
# # 🎯 使用该Pipeline指定的GPU进行跟踪
# nvtracker.set_property('gpu-id', self.gpu_id)
# nvtracker.set_property('ll-lib-file',
# '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
# logger.info(f"[GPU {self.gpu_id}] nvtracker 使用 GPU {self.gpu_id}")
#
# # 使用平衡配置文件(优先保障目标检测)
# balanced_config_path = '/sgx/prod/estellar/agent/config/tracker_config_balanced.yml'
# safe_config_path = '/sgx/prod/estellar/agent/config/tracker_config_safe.yml'
#
# if os.path.exists(balanced_config_path):
# nvtracker.set_property('ll-config-file', balanced_config_path)
# logger.info("使用平衡模式追踪器配置(优先保障目标检测)")
# elif os.path.exists(safe_config_path):
# nvtracker.set_property('ll-config-file', safe_config_path)
# logger.info("使用安全模式追踪器配置")
# else:
# nvtracker.set_property('ll-config-file', '/sgx/prod/estellar/agent/config/tracker_config.yml')
# logger.warning("平衡/安全配置文件不存在,使用默认配置")
# else:
# logger.info("目标追踪已禁用,跳过追踪器配置")
#
# # ====== 分流部分:nvstreamdemux ======
# nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "stream-demuxer")
# if not nvstreamdemux:
# raise RuntimeError("无法创建 nvstreamdemux 元素")
#
# # 添加核心元素
# elements = [queue1, pgie, nvstreamdemux]
# if nvtracker:
# elements.insert(2, nvtracker) # 在pgie和nvstreamdemux之间插入tracker
# for elem in elements:
# self.pipeline.add(elem)
#
# # 链接核心pipeline
# if not streammux.link(queue1):
# raise RuntimeError("无法连接 streammux -> queue1")
# if not queue1.link(pgie):
# raise RuntimeError("无法连接 queue1 -> pgie")
#
# # 根据是否启用追踪,选择不同的链接方式
# if nvtracker:
# if not pgie.link(nvtracker):
# raise RuntimeError("无法连接 pgie -> nvtracker")
# if not nvtracker.link(nvstreamdemux):
# raise RuntimeError("无法连接 nvtracker -> nvstreamdemux")
# logger.info("Pipeline链路: streammux -> queue1 -> pgie -> nvtracker -> nvstreamdemux")
# else:
# if not pgie.link(nvstreamdemux):
# raise RuntimeError("无法连接 pgie -> nvstreamdemux")
# logger.info("Pipeline链路: streammux -> queue1 -> pgie -> nvstreamdemux (无追踪)")
#
# # ====== 输出部分:为每个流创建独立的编码推流分支 ======
# for camera_id in self.active_cameras.keys():
# sink_idx = self.stream_index_map[camera_id]
# config = self.rtmp_configs[camera_id]
# self._create_output_branch(nvstreamdemux, camera_id, sink_idx, config)
#
# # 添加推理probe(在最后一个推理元素后获取结果)
# if nvtracker:
# # 有追踪器时,在tracker之后获取追踪结果
# probe_pad = nvtracker.get_static_pad("src")
# logger.info("推理probe已绑定到nvtracker输出")
# else:
# # 无追踪器时,在pgie之后获取检测结果
# probe_pad = pgie.get_static_pad("src")
# logger.info("推理probe已绑定到pgie输出(无追踪模式)")
#
# probe_pad.add_probe(Gst.PadProbeType.BUFFER, self._pgie_src_pad_buffer_probe, 0)
#
# 运行Pipeline
print("=" * 80)
print("🎬 运行中... (Ctrl+C停止)")
print("=" * 80 + "\n")
loop = GLib.MainLoop()
bus = self.pipeline.get_bus()
bus.add_signal_watch()
def on_message(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("\n✓ 到达EOS,停止Pipeline")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"\n✗ Pipeline错误: {err}")
print(f" 详情: {debug}")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print(f"⚠️ 警告: {err}")
return True
bus.connect("message", on_message, loop)
self.pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except KeyboardInterrupt:
print("\n⏹️ 停止...\n")
finally:
self.pipeline.set_state(Gst.State.NULL)
# self._print_summary()
return True 这个是第二个模型的获取 ROI的区域 但是获取不到检测区域内的检测物体。。