This post was updated on .
Hello there,
I'm trying to create a dynamic pipeline which allows multiple connections to rtsp sources, toggle between them using input-selector and send the data (after decoded) to an appsink. When launching the pipeline everything works fine. The issues occur when I kill one of the RTSP servers. I try to recover from this, so I remove the rtspsrc from the pipeline and try to add it again. Before I remove it, I try to change the state of that rtspsrc element to GST_STATE_NULL. But as I do that, this function call deadlocks. Btw, this only occurs if the RTSP server I kill is not the active one in the input-selector. I tried adding a queue after every rtspsrc and it indeed helped. So instead of it deadlocks every time, it only deadlocks every like 10th time. After I added the queue, the element which causes the deadlock is the queue, not the rtspsrc. If I try to add some sleep time before setting the state to NULL then it will always deadlock once again. So it's some sort of a race condition. I'm sharing parts of my code which are relevant: Bus callback which detects the rtsp errors. After it removes is I wait 1 second before trying to re-establish connection with it again. gboolean GstreamerHandler::handleBusCall(GstBus *, GstMessage *msg) { switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_EOS: { // ReInit(); break; } case GST_MESSAGE_ERROR: { gchar *debug; GError *error; qDebug() << "ERROR!!!"; gst_message_parse_error (msg, &error, &debug); // if (!flagFailure) { // flagFailure = TRUE; qCritical("[Video stream error] - Restarting stream every 1 second(s) until connection..."); g_printerr("Debugging info: %s\n", debug); // } g_free (debug); g_error_free (error); QString sourceName = QString(GST_MESSAGE_SRC_NAME(msg)); qDebug() << "Error with " << sourceName; if (m_sources.contains(sourceName)) { QString uri = m_sources[sourceName]->uri; removeSource(uri); QTimer::singleShot(1000, [=]() { retrySource(uri); }); } // FreeAll(); // QTimer::singleShot(1000, this, &GstreamerHandler::ReInit); return TRUE; } default: // qDebug("Message type: %d", GST_MESSAGE_TYPE (msg)); break; } return TRUE; } Remove source function: void GstreamerHandler::removeSource(QString uri) { for (auto it = m_sources.begin(); it != m_sources.end() ; ++it) { const QString &name = it.key(); RtspSource *source = it.value(); if (source->uri == uri) { GstElement *rtsp = source->elementRtsp; GstElement *queue = source->elementQueue; GstState state; qDebug() << "Getting state"; usleep(500); if (gst_element_get_state(rtsp, &state, NULL, GST_CLOCK_TIME_NONE) && (state == GST_STATE_PLAYING)) { qDebug() << "Changing state to NULL"; gst_element_set_state(queue, GST_STATE_NULL); qDebug() << "Queue changed to NULL"; gst_element_set_state(rtsp, GST_STATE_NULL); qDebug() << "State changed"; } gst_bin_remove(GST_BIN(customData.pipeline), rtsp); gst_bin_remove(GST_BIN(customData.pipeline), queue); qDebug() << "Rtsp refcount: " << GST_OBJECT_REFCOUNT_VALUE(rtsp); qDebug() << "queue refcount: " << GST_OBJECT_REFCOUNT_VALUE(queue); delete m_sources.take(name); break; } } qDebug() << "Source was removed..."; } Add source function: void GstreamerHandler::addSource(QString uri) { QString sourceName = QString("rtsp%1").arg(m_numSources); QString queueName = QString("queue%1").arg(m_numSources); qDebug() << "Adding new source:" << sourceName << ": " << uri; GstElement *rtsp = gst_element_factory_make("rtspsrc", sourceName.toLocal8Bit()); GstElement *queue = gst_element_factory_make("queue", queueName.toLocal8Bit()); RtspSource *newSource = new RtspSource(); newSource->elementRtsp = rtsp; newSource->elementQueue = queue; newSource->uri = uri; newSource->nameRtsp = sourceName; newSource->nameQueue = queueName; m_sources.insert(sourceName, newSource); gst_bin_add_many(GST_BIN(customData.pipeline), rtsp, queue, NULL); g_object_set(rtsp, "latency", 0, "location", uri.toLocal8Bit().data(), NULL); g_signal_connect(rtsp, "pad-added", G_CALLBACK(cb_new_rtspsrc_pad), queue); gst_element_set_state(customData.pipeline, GST_STATE_PLAYING); ++m_numSources; qDebug("Source was added!"); } Retry source function: void GstreamerHandler::retrySource(QString uri) { qDebug() << "Retrying source with element: " << uri; addSource(uri); } This is the call stack: libpthread.so.0!__GI___pthread_mutex_lock(pthread_mutex_t * mutex) (/build/glibc-S9d2JN/glibc-2.27/nptl/pthread_mutex_lock.c:115) libgstreamer-1.0.so.0!gst_pad_stop_task (Unknown Source:0) libgstcoreelements.so![Unknown/Just-In-Time compiled code] (Unknown Source:0) libgstreamer-1.0.so.0!gst_pad_set_active (Unknown Source:0) libgstreamer-1.0.so.0![Unknown/Just-In-Time compiled code] (Unknown Source:0) libgstreamer-1.0.so.0!gst_iterator_fold (Unknown Source:0) libgstreamer-1.0.so.0![Unknown/Just-In-Time compiled code] (Unknown Source:0) libgstreamer-1.0.so.0!gst_element_change_state (Unknown Source:0) libgstreamer-1.0.so.0!gst_element_change_state (Unknown Source:0) libgstreamer-1.0.so.0![Unknown/Just-In-Time compiled code] (Unknown Source:0) libXtraOsd.so!GstreamerHandler::removeSource(GstreamerHandler * const this, QString uri) (/home/omer/missioncontroller_mark2/modules/XtraOsd/src/GstreamerHandler.cpp:287) libXtraOsd.so!GstreamerHandler::handleBusCall(GstreamerHandler * const this, GstMessage * msg) (/home/omer/missioncontroller_mark2/modules/XtraOsd/src/GstreamerHandler.cpp:355) ... -- Sent from: http://gstreamer-devel.966125.n4.nabble.com/ _______________________________________________ gstreamer-devel mailing list gstreamer-devel@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
I think the reason it deadlocks, is because it is waiting for the element to
flush all the data downstream before changing states, but the input-selector doesn't take data from that element. This is why this only occurs when the active pad is not part of the rtspsrc that died. How can I flush the buffer of the queue / rtspsrc before changing the state? -- Sent from: http://gstreamer-devel.966125.n4.nabble.com/ _______________________________________________ gstreamer-devel mailing list [hidden email] https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
To flush the queue, you can do something like the following:
- Get the queue sink and src pads - Add a probe to the src pad with a new callback to delete EOS signals - Send an EOS to the sink pad - In the callback function, wait for the EOS to arrive, then delete it. The queue should be flushed. - Set the queue to NULL and do whatever. If you have to flush the rtspsrc, you'll have to send the EOS to that instead of the queue sink pad (I've never done this, though, so can't speak on this). Something like: GstPad * sinkpad = gst_element_get_static_pad(queue, "sink"); GstPad * srcpad = gst_element_get_static_pad(queue, "src"); gst_pad_add_probe(srcpad, GST_PAD_PROBE_TYPE_EVENT, delete_eos_cb, userdata, NULL); gst_pad_send_event(sinkpad, gst_event_new_eos()); gst_object_unref(srcpad); gst_object_unref(sinkpad); // ... static GstPadProbeReturn delete_eos_cb(GstPad* pad, GstPadProbeInfo* info, gpointer userdata) { GstEvent* pad_event = gst_pad_probe_get_event(info); if (pad_event->type == GST_EVENT_EOS) { // EOS went through the element // Do something with rtspsrc and queue return GST_PAD_PROBE_DROP; // Delete the EOS to prevent flushing the entire pipeline } return GST_PAD_PROBE_OK; // Otherwise, pass the event onwards } On a side note, I recently had a problem where I was trying to set an element to NULL before removing it from a pipeline, but this was in deadlock. To solve this, instead of altering the pipeline in the bus message handler, I scheduled a function to run a bit later using something similar to your QTimer::singleShot. Just FYI. -- Sent from: http://gstreamer-devel.966125.n4.nabble.com/ _______________________________________________ gstreamer-devel mailing list [hidden email] https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
This post was updated on .
Thank you for your response.
I actually finally figured it out. I ended up throwing the queue away. Didn't really need it. Before calling gst_element_set_state(...)I flushed the buffer using: gst_pad_push_event(gst_event_new_flush_start())Then I could change the state without a problem. Then I had a different issue where my new rtspsrc didn't send any data to the input-selector. I figured out that I needed to send a flush_stop event so that the input-selector pad can receive new data to its sink pad. I really hope this will help anyone in the future. I googled this forever and couldn't find an answer. I think that the reason that it deadlocked is because data was still present in the element when I tried to change it to NULL. And since this element is not active in the input-selector, it will remain there. So flushing is a must. |
Free forum by Nabble | Edit this page |