Hello, I've been working on a Qt app to display a RTSP stream on an embedded
device (TI AM5728). I would like to be able to change the location of the stream on the fly, as there are multiple stream locations on the network. I am able to change stream locations using the following method: 1) Receive Qt signal in a Qt slot callback, create a pad probe to catch the pipeline 2) in that pad probe callback, I create a new probe to catch an EOS event that I send down the pipeline. The probe is on the last 'processing' element before the sink. The EOS is sent to flush out the old stream information. 3) in the EOS callback, I remove the old rtspsrc from the pipeline, create a new one with the new location, and link it to a callback for its 'sometimes' pad. All of the above works as intended, however I am creating 7 new threads in my program each time I change source. As of yet I have no conclusions why they're being created, but my hypothesis is that previous rtspsrc elements are not being removed properly, or that I am creating more and more new pipelines rather than editing a single one. This has me quite stumped. Where in the following code would all these new threads be coming from?? #include "camerastream.h" #include <QTimer> //function prototypes for callbacks that nobody but me should access static void on_rtsp_pad_added(GstElement* element, GstPad *pad, gpointer data); static gboolean bus_call(GstBus *bus, GstMessage *message, gpointer *data); static GstPadProbeReturn on_pad_probe(GstPad *pad, GstPadProbeInfo *info, gpointer data); static GstPadProbeReturn event_probe(GstPad *pad, GstPadProbeInfo *info, gpointer data); gchar *m_name; CameraStream::CameraStream(QThread *parent) : QThread(parent) { m_cameraStream = NULL; m_loop = NULL; m_name = NULL; QTimer *timer = new QTimer(this); connect(timer, SIGNAL(timeout()), this, SLOT(slot_changeSource())); timer->start(10000); } CameraStream::~CameraStream(){ //kill media pipeline gst_element_set_state(m_cameraStream, GST_STATE_NULL); gst_object_unref(m_cameraStream); g_main_loop_unref(m_loop); } void CameraStream::run(){ //overrides QThread::run() GstBus *bus; guint busWatchId; GstElement *src, *depay, *parser, *decoder, *vpe, *filter, *sink; GstCaps *vpeCaps; m_loop = g_main_loop_new(NULL, FALSE); //create pipeline elements m_cameraStream = gst_pipeline_new("display_pipeline"); src = gst_element_factory_make("rtspsrc", "rtspsrc"); depay = gst_element_factory_make("rtpjpegdepay", "depay"); parser = gst_element_factory_make("jpegparse", "parser"); decoder = gst_element_factory_make("ducatijpegdec", "decoder"); vpe = gst_element_factory_make("vpe", "vpe"); filter = gst_element_factory_make("capsfilter", "filter"); sink = gst_element_factory_make("waylandsink", "sink"); if(!(m_cameraStream || src || depay || parser || decoder || vpe || filter || sink)){ qFatal("could not create pipeline elements"); exit(1); } g_object_set(G_OBJECT(src), "location", "rtsp://192.168.50.29/av0_1", "latency", 0, NULL); g_signal_connect(src, "pad-added", G_CALLBACK(on_rtsp_pad_added), m_cameraStream); //add src caps? vpeCaps = gst_caps_from_string("video/x-raw, format=NV12, width=800, height=480"); //change this when Tomas' patch hits if(!vpeCaps){ qFatal("cannot create caps"); exit(1); } g_object_set(G_OBJECT(filter), "caps", vpeCaps, NULL); g_object_set(G_OBJECT(sink), "sync", false, NULL); //add and link elements to create full pipeline gst_bin_add_many(GST_BIN(m_cameraStream), src, depay, parser, decoder, vpe, sink, NULL); if(!gst_element_link_many(depay, parser, decoder, vpe, sink, NULL)){ qFatal("cannot link elements"); exit(1); } gst_caps_unref(vpeCaps); bus = gst_pipeline_get_bus(GST_PIPELINE(m_cameraStream)); busWatchId = gst_bus_add_watch(bus, GstBusFunc(bus_call), m_loop); gst_object_unref(bus); gst_element_set_state(m_cameraStream, GST_STATE_PLAYING); g_main_loop_run(m_loop); } void CameraStream::slot_changeSource(/*QString newUrl*/){ GstPad *pad; //needs to stay playing, otherwise the probe callback will never trigger qDebug("\nslot_changeSource"); GstElement* depay = gst_bin_get_by_name(GST_BIN(m_cameraStream), "depay"); if(depay){ //finds this qDebug("m_name: %s", m_name); pad = gst_element_get_static_pad(depay, "src"); if(pad){ qDebug("found pad"); gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, on_pad_probe, m_cameraStream, NULL); //first Data could be QString newUrl gst_element_set_state(m_cameraStream, GST_STATE_PLAYING); //needed to trip the on_rtsp_pad_added callback } else qDebug("did not find Pad"); } } //allow linking of the rtspsrc and jpegdepay elements, for whatever reason when linked with gst_element_link_many, rtspsrc doesn't link //manually link with the filter cap application/x-rtp static void on_rtsp_pad_added(GstElement* element, GstPad *pad, gpointer data){ gchar *name; GstElement *depay; qDebug("on_rtsp_pad_added"); m_name = gst_pad_get_name(pad); qDebug("on_rtsp_pad_added, rtspsrc pad name: %s", m_name); //depay = GST_ELEMENT(data); depay = gst_bin_get_by_name(GST_BIN(data), "depay"); if(depay){ qDebug("pad_added: found depay"); } if(element){ qDebug("pad_added: found element"); } else qDebug("pad_added: could not find element"); if(!gst_element_link_pads(element, m_name, depay, "sink")){ qFatal("pad_added: failed to link elements"); } g_free(name); } //peek into the messages on the bus to detect EOS (live so unneeded) and Error messages. //the unused bus variable is due to these parameters being part of a standard gstreamer template for bus callbacks. static gboolean bus_call(GstBus *bus, GstMessage *message, gpointer *data){ GMainLoop *loop = (GMainLoop*)data; switch(GST_MESSAGE_TYPE(message)){ case GST_MESSAGE_EOS: qDebug("end of stream\n"); g_main_loop_quit(loop); break; case GST_MESSAGE_ERROR:{ gchar *debug; GError *error; gst_message_parse_error(message, &error, &debug); g_free(debug); qFatal("GSTREAMER GST_MESSAGE_ERROR! %s", error->message); g_main_loop_quit(loop); break; } default:{ break; } } return TRUE; } static GstPadProbeReturn on_pad_probe(GstPad *pad, GstPadProbeInfo *info, gpointer data){ GstPad *srcPad, *sinkPad; GstElement *vpe, *depay; qDebug("on_pad_probe"); gst_pad_remove_probe(pad, GST_PAD_PROBE_INFO_ID(info)); //new probe for EOS, this one on the vpe vpe = gst_bin_get_by_name(GST_BIN(data), "vpe"); srcPad = gst_element_get_static_pad(vpe, "src"); gst_pad_add_probe(srcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, event_probe, data, NULL); //push EOS into the element, wait for the EOS to appear on the srcpad depay = gst_bin_get_by_name(GST_BIN(data), "depay"); sinkPad = gst_element_get_static_pad(depay, "sink"); gst_pad_send_event(sinkPad, gst_event_new_eos()); return GST_PAD_PROBE_OK; } static GstPadProbeReturn event_probe(GstPad *pad, GstPadProbeInfo *info, gpointer data){ GstElement *rtspsrcOld, *rtspsrcNew, *depay; qDebug("event_probe"); if(GST_EVENT_TYPE(GST_PAD_PROBE_INFO_DATA(info)) != GST_EVENT_EOS){ qDebug("NOT EOS"); return GST_PAD_PROBE_PASS; } else if(GST_EVENT_TYPE(GST_PAD_PROBE_INFO_DATA(info)) == GST_EVENT_EOS){ gst_pad_remove_probe(pad, GST_PAD_PROBE_INFO_ID(info)); rtspsrcOld = gst_bin_get_by_name(GST_BIN(data), "rtspsrc"); if(rtspsrcOld){ qDebug("found rtspsrcOld"); depay = gst_bin_get_by_name(GST_BIN(data), "depay"); gst_element_unlink(rtspsrcOld, depay); gst_bin_remove(GST_BIN(data), rtspsrcOld); //remove old rtspsrc from pipeline, should unlink from depay automatically. rtspsrcNew = gst_element_factory_make("rtspsrc", "rtspsrc"); //create new rtspsrc (works) g_object_set(rtspsrcNew, "location", "rtsp://192.168.50.30/av0_1", "latency", 0, NULL); g_signal_connect(G_OBJECT(rtspsrcNew), "pad-added", G_CALLBACK(on_rtsp_pad_added), data); gst_bin_add(GST_BIN(data), rtspsrcNew); //add the new rtspsrc to the pipeline (works) //gst_element_set_state(GST_ELEMENT(data), GST_STATE_PLAYING); //needed to trip the on_rtsp_pad_added callback return GST_PAD_PROBE_HANDLED; } } return GST_PAD_PROBE_OK; } -- Sent from: http://gstreamer-devel.966125.n4.nabble.com/ _______________________________________________ gstreamer-devel mailing list [hidden email] https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
Free forum by Nabble | Edit this page |