include <gst/gst.h>
include <gst/app/gstappsink.h>
include <gst/app/gstappsrc.h>
include <gst/rtsp-server/rtsp-server.h>
include <opencv2/opencv.hpp>
include
include
include
include
include
// 默认参数
static const int DEFAULT_WIDTH = 1280;
static const int DEFAULT_HEIGHT = 720;
static const int DEFAULT_FPS = 30;
static const int DEFAULT_BITRATE_KBPS = 2000000; // encoder 初始码率 (kbps)
std::atomic g_appsrc_ready(false);
// 全局 appsrc 指针(RTSP factory 在 media-configure 时取得)
static GstElement *g_appsrc = nullptr;
static std::atomic g_frame_index{0}; // 用于生成 PTS
// helper: create appsrc caps string
static GstCaps* make_appsrc_caps(int width, int height, int fps) {
return gst_caps_new_simple(“video/x-raw”,
“format”, G_TYPE_STRING, “I420”,
“width”, G_TYPE_INT, width,
“height”, G_TYPE_INT, height,
“framerate”, GST_TYPE_FRACTION, fps, 1,
NULL);
}
// appsink new-sample 回调:将解码后的 I420 buffer → cv::Mat → draw → 转回 I420 → push 到 appsrc
static GstFlowReturn on_new_sample_from_sink(GstElement *appsink, gpointer user_data) {
GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink));
if (!sample) return GST_FLOW_ERROR;
GstBuffer *buf = gst_sample_get_buffer(sample);
GstCaps *caps = gst_sample_get_caps(sample);
if (!buf || !caps) {
if (sample) gst_sample_unref(sample);
return GST_FLOW_OK;
}
// gchar *caps_str = gst_caps_to_string(caps);
// // g_print("Appsink caps: %s\n", caps_str);
// g_free(caps_str);
// 获取视频宽高与格式(我们在设置 caps 时已经指定为 I420)
GstStructure *s = gst_caps_get_structure(caps, 0);
int width = 0, height = 0;
gst_structure_get_int(s, "width", &width);
gst_structure_get_int(s, "height", &height);
// map buffer for read
GstMapInfo map;
if (!gst_buffer_map(buf, &map, GST_MAP_READ)) {
gst_sample_unref(sample);
return GST_FLOW_OK;
}
// I420 内存布局: Y plane (W*H), U plane (W/2 * H/2), V plane (W/2 * H/2)
int y_size = width * height;
int uv_size = (width/2) * (height/2);
// Safety check
if ((gint)map.size < (guint)(y_size + uv_size * 2)) {
gst_buffer_unmap(buf, &map);
gst_sample_unref(sample);
return GST_FLOW_OK;
}
// Convert I420 raw buffer -> OpenCV Mat (YUV planar -> combined Mat) then convert to BGR
// OpenCV expects a single Mat of height*1.5 x width for I420 when using COLOR_YUV2BGR_I420
cv::Mat yuv(height + height/2, width, CV_8UC1, (uchar*)map.data);
cv::Mat bgr;
cv::cvtColor(yuv, bgr, cv::COLOR_YUV2BGR_I420);
// --- 在这里对 bgr 做处理(示例:画一个绿色矩形) ---
cv::rectangle(bgr, cv::Point(50,50), cv::Point(200,200), cv::Scalar(0,255,0), 3);
// 可以显示(调试用)
// cv::imshow("processed", bgr);
// cv::waitKey(1);
//保存jpg
// static int index = 0;
// char jpg_name[256] = {0};
// snprintf(jpg_name, sizeof(jpg_name) - 1, "output%d.jpg", index);
// index++;
// cv::imwrite(jpg_name, bgr);
// 将处理后的 BGR 转回 I420
cv::Mat yuv_out;
cv::cvtColor(bgr, yuv_out, cv::COLOR_BGR2YUV_I420); // 输出大小应为 height*1.5 x width, type CV_8UC1
// 准备推送到 appsrc
if (!g_appsrc) {
gst_buffer_unmap(buf, &map);
gst_sample_unref(sample);
// g_printerr("appsrc not ready, dropping frame\n");
return GST_FLOW_OK;
}
// 创建一个 GstBuffer,填入 yuv_out 数据(拷贝)
size_t out_size = (size_t)(yuv_out.total() * yuv_out.elemSize());
GstBuffer *out_buf = gst_buffer_new_allocate(NULL, out_size, NULL);
GstMapInfo out_map;
if (!gst_buffer_map(out_buf, &out_map, GST_MAP_WRITE)) {
gst_buffer_unref(out_buf);
gst_buffer_unmap(buf, &map);
gst_sample_unref(sample);
return GST_FLOW_OK;
}
memcpy(out_map.data, yuv_out.data, out_size);
gst_buffer_unmap(out_buf, &out_map);
// 设置 PTS/DTS(按帧索引和帧率计算)
guint64 frame_id = g_frame_index.fetch_add(1);
GstClockTime pts = (GstClockTime)(frame_id * (GST_SECOND / DEFAULT_FPS));
GST_BUFFER_PTS(out_buf) = pts;
GST_BUFFER_DTS(out_buf) = pts;
GST_BUFFER_DURATION(out_buf) = gst_util_uint64_scale_int(1, GST_SECOND, DEFAULT_FPS);
if (!g_appsrc_ready) {
gst_sample_unref(sample);
return GST_FLOW_OK;
}
// push into appsrc
GstFlowReturn ret;
g_signal_emit_by_name(g_appsrc, "push-buffer", out_buf, &ret);
// GstFlowReturn ret = gst_app_src_push_buffer(GST_APP_SRC(g_appsrc), out_buf);
if (ret != GST_FLOW_OK) {
g_printerr("appsrc push-buffer returned %d\n", ret);
}
gst_buffer_unref(out_buf);
gst_buffer_unmap(buf, &map);
gst_sample_unref(sample);
return GST_FLOW_OK;
}
// appsink eos/error callbacks
static void on_eos(GstElement *sink, gpointer user_data) {
g_print(“Appsink got EOS\n”);
}
static void on_preroll(GstElement *sink, gpointer user_data) {
g_print(“Appsink preroll\n”);
}
void need_data_fun(GstElement *src, guint size, gpointer user_data) {
g_print("Appsrc ready, start feeding\n");
// 此处可以启动一个线程或标志,让 appsink 回调开始 push
g_appsrc_ready = true;
GstState state;
gst_element_get_state(g_appsrc, &state, nullptr, GST_CLOCK_TIME_NONE);
g_print("Appsrc pipeline state: %d\n", state); // 4 = PLAYING
}
// media-configure: 当 RTSP server 为 client 创建 media pipeline 时会调用,取得 appsrc 指针并设置属性
static void media_configure(GstRTSPMediaFactory *factory, GstRTSPMedia *media, gpointer user_data) {
gst_object_ref(media);
GstElement *element = gst_rtsp_media_get_element(media); // pipeline for this media
if (!element) {
gst_object_unref(media);
g_printerr(“Failed to get media element\n”);
return;
}
// 查找 appsrc 名称必须和 factory launch 里一致 (mysrc)
GstElement *local_appsrc = gst_bin_get_by_name_recurse_up(GST_BIN(element), "mysrc");
if (!local_appsrc) {
g_printerr("media_configure: cannot find appsrc 'mysrc'\n");
} else {
g_appsrc = local_appsrc;
// 设置 appsrc 属性
g_object_set(G_OBJECT(g_appsrc),
"format", GST_FORMAT_TIME,
"is-live", TRUE,
"block", TRUE,
NULL);
g_print("Appsrc obtained for media\n");
g_signal_connect(local_appsrc, "need-data", G_CALLBACK(need_data_fun), NULL);
}
// 解除 element 引用(我们保留 g_appsrc)
if (element) gst_object_unref(element);
gst_object_unref(media);
}
int main(int argc, char *argv) {
gst_init(&argc, &argv);
if (argc < 3) {
std::cerr << “Usage: " << argv[0] << " \n”;
std::cerr << “Example: " << argv[0] << " rtsp://192.168.1.100:554/stream /relay\n”;
return -1;
}
std::string input_rtsp = argv[1];
std::string mount_point = argv[2];
// 1) 创建 source pipeline: rtspsrc -> rtph265depay -> h265parse -> avdec_h265 -> videoconvert -> capsfilter(I420) -> appsink
GstElement *src_pipeline = gst_pipeline_new("src-pipeline");
GstElement *rtspsrc = gst_element_factory_make("rtspsrc", "src_rtspsrc");
GstElement *depay = gst_element_factory_make("rtph265depay", "depay");
GstElement *h265parse = gst_element_factory_make("h265parse", "hpar");
GstElement *decoder = gst_element_factory_make("nvv4l2decoder", "dec"); // 使用软件解码,Jetson 可改为 nvv4l2decoder
GstElement *videoconv = gst_element_factory_make("nvvidconv", "videoconv_src");
GstElement *capsfilter = gst_element_factory_make("capsfilter", "capsfilter_src");
GstElement *appsink = gst_element_factory_make("appsink", "appsink");
if (!src_pipeline || !rtspsrc || !depay || !h265parse || !decoder || !videoconv || !capsfilter || !appsink) {
g_printerr("Failed to create source pipeline elements. Ensure GStreamer plugins are installed.\n");
return -1;
}
g_object_set(G_OBJECT(rtspsrc), "location", input_rtsp.c_str(), "latency", 50, NULL);
// appsink properties
g_object_set(G_OBJECT(appsink),
"emit-signals", TRUE,
"sync", FALSE,
"max-buffers", 5,
"drop", TRUE,
NULL);
// caps: 要求 appsink 接受 I420
GstCaps *src_caps = make_appsrc_caps(DEFAULT_WIDTH, DEFAULT_HEIGHT, DEFAULT_FPS);
g_object_set(G_OBJECT(capsfilter), "caps", src_caps, NULL);
gst_caps_unref(src_caps);
gst_bin_add_many(GST_BIN(src_pipeline), rtspsrc, depay, h265parse, decoder, videoconv, capsfilter, appsink, NULL);
// link static part: depay -> parse -> decode -> videoconvert -> capsfilter -> appsink
if (!gst_element_link_many(depay, h265parse, decoder, videoconv, capsfilter, appsink, NULL)) {
g_printerr("Failed to link source pipeline chain\n");
return -1;
}
// rtspsrc pad-added
g_signal_connect(rtspsrc, "pad-added", G_CALLBACK(+[] (GstElement *src, GstPad *new_pad, gpointer user_data) {
GstElement *depay = GST_ELEMENT(user_data);
GstPad *sinkpad = gst_element_get_static_pad(depay, "sink");
if (gst_pad_is_linked(sinkpad)) {
gst_object_unref(sinkpad);
return;
}
GstCaps *new_pad_caps = gst_pad_get_current_caps(new_pad);
GstStructure *str = gst_caps_get_structure(new_pad_caps, 0);
const gchar *name = gst_structure_get_name(str);
g_print("rtspsrc pad-added with caps %s\n", name);
GstPadLinkReturn ret = gst_pad_link(new_pad, sinkpad);
if (GST_PAD_LINK_FAILED(ret)) {
g_printerr("Type is '%s' but linking failed.\n", name);
} else {
g_print("Linked rtspsrc -> depay\n");
}
if (new_pad_caps) gst_caps_unref(new_pad_caps);
gst_object_unref(sinkpad);
}), depay);
// appsink callbacks
g_signal_connect(appsink, "new-sample", G_CALLBACK(on_new_sample_from_sink), nullptr);
g_signal_connect(appsink, "eos", G_CALLBACK(on_eos), nullptr);
g_signal_connect(appsink, "new-preroll", G_CALLBACK(on_preroll), nullptr);
// start source pipeline
gst_element_set_state(src_pipeline, GST_STATE_PLAYING);
// 2) RTSP server with appsrc pipeline: appsrc name=mysrc caps=I420 -> queue -> videoconvert -> x264enc/nvv4l2h265enc -> h265parse -> rtph265pay
GstRTSPServer *server = gst_rtsp_server_new();
GstRTSPMountPoints *mounts = gst_rtsp_server_get_mount_points(server);
GstRTSPMediaFactory *factory = gst_rtsp_media_factory_new();
// 注意: 我这里用 nvv4l2h265enc(Jetson 硬件)或 x265enc/openh265 之类可替换。保持 caps 为 I420,videoconvert 会做格式转换。
// "( appsrc name=mysrc ! nvvideoconvert ! nvv4l2h265enc bitrate=2000000 ! h265parse ! rtph265pay name=pay0 pt=96 )";
std::string launch_pipeline =
"( appsrc name=mysrc caps=\"video/x-raw,format=I420,width=" + std::to_string(DEFAULT_WIDTH) +
",height=" + std::to_string(DEFAULT_HEIGHT) + ",framerate=" + std::to_string(DEFAULT_FPS) + "/1\" is-live=true block=true format=time ) "
"! queue ! nvvidconv ! nvv4l2h265enc bitrate=" + std::to_string(DEFAULT_BITRATE_KBPS) +
" speed-preset=ultrafast tune=zerolatency ! "
"h264parse config-interval=1 ! rtph264pay name=pay0 pt=96";
// 如果你需要 H265,并在 Jetson 上使用硬件编码,替换为:
// "... ! videoconvert ! nvv4l2h265enc name=enc bitrate=2000000 iframeinterval=30 ! h265parse ! rtph265pay name=pay0 pt=96"
gst_rtsp_media_factory_set_launch(factory, launch_pipeline.c_str());
gst_rtsp_media_factory_set_shared(factory, TRUE);
g_signal_connect(factory, "media-configure", G_CALLBACK(media_configure), NULL);
gst_rtsp_mount_points_add_factory(mounts, mount_point.c_str(), factory);
g_object_unref(mounts);
if (gst_rtsp_server_attach(server, NULL) == 0) {
g_printerr("Failed to attach RTSP server\n");
return -1;
}
g_print("RTSP server ready at rtsp://<host>:8554%s\n", mount_point.c_str());
// GLib main loop
GMainLoop *loop = g_main_loop_new(NULL, FALSE);
g_print("Running main loop...\n");
g_main_loop_run(loop);
// Cleanup (not reached in normal run)
gst_element_set_state(src_pipeline, GST_STATE_NULL);
gst_object_unref(src_pipeline);
if (g_appsrc) gst_object_unref(g_appsrc);
g_main_loop_unref(loop);
return 0;
}
这是我的代码