Restart Pipeline

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Restart Pipeline

killerrats
Administrator
This post was updated on .
       ------ rtph264depay --- h264parse ---- tee --------- queue ---
rtspsrc                                                                                avimux --- appsink
       ------ rtpmp4gdepay --- aacparse ----- tee --------- queue ---

I successfully restarted the pipeline but when I go to play again it says,
"cannot link the depay elements". It will work the first time but not the
second time. I setup the signal "pad-removed" for the rtspsrc but it says
the pads are not linked. Any ideas?

steps I take:

Stop the Pipe:
---------------
EOS through pipeline
release the pads on tees
set pipeline to NULL
disconnect the signals

Restart:
----------
pipeline to PLAYING
setup signals



-----
------------------------------
Gstreamer 1.12.4
------------------------------
--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
gstreamer-devel@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
------------------------------
Gstreamer 1.16.2
------------------------------
Windows
Reply | Threaded
Open this post in threaded view
|

Re: Restart Pipeline

killerrats
Administrator
This post was updated on .
I found out that if I grabbed out the element from the pipeline and assigned
it like i just created it in the beginning it will reset the depays (example
below). the only problem is that it gets to a point where it says internal
stream error after awhile. I don't understand that.

if i don't do this it will come up with the cannot link the audio and video
pads to each of the depays. the first marks are from the "pad-removed"
signal which show the pads are unlinked from the previous play. marks below
are the new ones coming in after a restart.

<http://gstreamer-devel.966125.n4.nabble.com/file/t377034/rtspsrc_link_error.jpg

GstElement* pipe,* source,* depay;

void unlink();

int main()
{
    GMainLoop* loop;

    pipe = gst_pipeline_new("pipeline");
    source = gst_element_factory_make("rtspsrc", "source");
    depay =  = gst_element_factory_make("rtph264depay", "depay");

    gst_element_set_state(pipe, GST_STATE_PLAYING);

    do
    {
      g_main_loop_run(loop);
      unlink();
    }
}

void unlink()
{
     depay = gst_bin_get_by_name(GST_BIN(pipe), "depay");
     
     gst_element_set_state(pipe, GST_STATE_NULL);

     gst_bin_remove(GST_BIN(pipe),depay);

     gst_bin_add(GST_BIN(pipe),depay);

     gst_element_set_state(pipe, GST_STATE_PLAYING);
}



-----
------------------------------
Gstreamer 1.12.4
------------------------------
--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
gstreamer-devel@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
------------------------------
Gstreamer 1.16.2
------------------------------
Windows
Reply | Threaded
Open this post in threaded view
|

Re: Restart Pipeline

killerrats
Administrator
I even tried using the parse launch and did the same thing. do I have to do
something to clear the depays before running again?

// ---------------- CODE ----------------------

this->srcPipeline = gst_parse_launch("rtspsrc name=source ! rtph264depay
name=videodepay ! h264parse name=videoparse ! avimux name=avimux ! appsink
name=appsink source. ! rtpmp4gdepay name=audiodepay ! aacparse
name=audioparse ! mux.", NULL);
        //this->srcPipeline = gst_pipeline_new("pipeline");
        this->source = gst_bin_get_by_name(GST_BIN(this->srcPipeline), "source");
        this->rtpdepay = gst_bin_get_by_name(GST_BIN(this->srcPipeline),
"videodepay");
        this->parse = gst_bin_get_by_name(GST_BIN(this->srcPipeline), "vidparse");
        this->audioRtpDepay = gst_bin_get_by_name(GST_BIN(this->srcPipeline),
"audiodepay");
        this->audioParse = gst_bin_get_by_name(GST_BIN(this->srcPipeline),
"audioparse");
        this->mux = gst_bin_get_by_name(GST_BIN(this->srcPipeline), "avimux");
        this->appsink = gst_bin_get_by_name(GST_BIN(this->srcPipeline), "appsink");

        g_object_set(this->source
                , "location", "[IP]", NULL);

        g_object_set(GST_OBJECT(this->appsink), "emit-signals", TRUE
                , "max-buffers", 1
                , "enable-last-sample", FALSE, NULL);

        g_signal_connect(appsink, "new-sample", G_CALLBACK(appsink_ToFile),
apService);

        this->bus = gst_element_get_bus(this->srcPipeline);

        gst_bus_add_signal_watch(this->bus);
        g_signal_connect(this->bus, "message", G_CALLBACK(bus_cb), apService);

        do
        {

                /* Start playing */
                if (gst_element_set_state(this->srcPipeline, GST_STATE_PLAYING) ==
GST_STATE_CHANGE_FAILURE) {
                        g_printerr("Unable to set the pipeline to the playing state.\n");
                        gst_object_unref(this->srcPipeline);
                        {
                                return;
                        }
                }

                this->aEstablishedConnection = true;
                this->mLoop = g_main_loop_new(NULL, FALSE);
                g_main_loop_run(this->mLoop);
                this->aEstablishedConnection = false;
                this->gPlayingInterval = 0;
                this->gPausedTimeout = 0;

                /* Free resources */
                g_main_loop_unref(this->mLoop);

                gst_element_set_state(this->srcPipeline, GST_STATE_NULL);

                gst_element_set_state(this->source, GST_STATE_NULL);
                gst_element_set_state(this->rtpdepay, GST_STATE_NULL);
                gst_element_set_state(this->parse, GST_STATE_NULL);
                gst_element_set_state(this->audioRtpDepay, GST_STATE_NULL);
                gst_element_set_state(this->audioParse, GST_STATE_NULL);
                gst_element_set_state(this->mux, GST_STATE_NULL);
                gst_element_set_state(this->appsink, GST_STATE_NULL);

                g_object_set(this->source
                        , "location", "[IP]", NULL);
}while(!this->aStopProgram);



-----
------------------------------
Gstreamer 1.12.4
------------------------------
--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
------------------------------
Gstreamer 1.16.2
------------------------------
Windows
Reply | Threaded
Open this post in threaded view
|

Re: Restart Pipeline

WisdomPill
In reply to this post by killerrats
I'm facing the same issue and I was curious to know if there's any
recommended way to restart the pipeline in case of connection lost. I solved
reinitializing the pipeline, signal watchers ecc... But I don't think it's a
nice way to solve it.
My problem is that rtspsrc gives an error when the connection is lost and if
I try to play again the pipeline all the elements the last one
"autovideosink" remians in state ready. The pipeline is the following
"rtspsrc location={} ! rtpjitterbuffer ! rtph264depay ! avdec_h264 !
videoconvert ! autovideosink sync=false".

Hope it helps!



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
Reply | Threaded
Open this post in threaded view
|

Re: Restart Pipeline

killerrats
Administrator
I even tried going back to 1.6.0 to see if it works to stop and then play
again and does the same thing.

well I figured out how to reinitialize the pipeline but in long term it
seems to have a memory leak problem. at least when I don't use the parse
launch method. the comparisons on using parse launch and not for memory.
this is unreference everything i used for the pipeline. I don't know if i'm
doing something wrong or what.

over 12hr period:
parse launch:
      average 13.6mb. I can't really tell if it will increase the memory
usage.
no parse launch:
     starts off at the 10mb. it seems to increase in memory over time. so
far it's over 28mb.

glong appsinkId, busId;
        do
        {
                this->srcPipeline = gst_parse_launch("rtspsrc name=source ! rtph264depay
name=videodepay ! h264parse name=videoparse ! avimux name=avimux ! appsink
name=appsink source. ! rtpmp4gdepay ! aacparse ! mux.", NULL);
                this->source = gst_bin_get_by_name(GST_BIN(this->srcPipeline), "source");
                this->appsink = gst_bin_get_by_name(GST_BIN(this->srcPipeline),
"appsink");

                g_object_set(this->source
                        , "location", "[IP]", NULL);

                g_object_set(GST_OBJECT(this->appsink), "emit-signals", TRUE
                        , "max-buffers", 1
                        , "enable-last-sample", FALSE, NULL);

                appsinkId = g_signal_connect(appsink, "new-sample",
G_CALLBACK(appsink_ToFile), apService);

                this->bus = gst_element_get_bus(this->srcPipeline);

                gst_bus_add_signal_watch(this->bus);
                busId = g_signal_connect(this->bus, "message", G_CALLBACK(bus_cb),
apService);


                /* Start playing */
                if (gst_element_set_state(this->srcPipeline, GST_STATE_PLAYING) ==
GST_STATE_CHANGE_FAILURE) {
                        g_printerr("Unable to set the pipeline to the playing state.\n");
                        gst_object_unref(this->srcPipeline);
                        {
                                return;
                        }
                }

                this->aEstablishedConnection = true;
                this->mLoop = g_main_loop_new(NULL, FALSE);
                g_main_loop_run(this->mLoop);
                this->aEstablishedConnection = false;
                this->gPlayingInterval = 0;
                this->gPausedTimeout = 0;

                /* Free resources */
                g_main_loop_unref(this->mLoop);

                gst_element_set_state(this->srcPipeline, GST_STATE_PAUSED);
                gst_element_set_state(this->srcPipeline, GST_STATE_NULL);
                g_signal_handler_disconnect(this->appsink, appsinkId);
                g_signal_handler_disconnect(this->bus, busId);
                gst_object_unref(this->source);
                gst_object_unref(this->appsink);

                gst_bus_remove_signal_watch(this->bus);
                gst_object_unref(this->bus);
                gst_object_unref(this->srcPipeline);

        } while (!this->aStopProgram);



-----
------------------------------
Gstreamer 1.12.4
------------------------------
--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
------------------------------
Gstreamer 1.16.2
------------------------------
Windows
Reply | Threaded
Open this post in threaded view
|

Re: Restart Pipeline

killerrats
Administrator
In reply to this post by WisdomPill
I know if you use the bus message error coming and check the source for the
error it will let you quit when the connection is lost. once it gets the
signal then you can quit accordingly. unless that is what you have done
already.

Bus Messages <http://www.planetkorey.com/tmp/busMessage.cpp>  



-----
------------------------------
Gstreamer 1.12.4
------------------------------
Windows
--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
------------------------------
Gstreamer 1.16.2
------------------------------
Windows
Reply | Threaded
Open this post in threaded view
|

Re: Restart Pipeline

WisdomPill
I solved the issue adding the pad-added signal to the rtspsrc and unlinking
rtspsrc from the rtpjitterbuffer.
My code is the following, but it's Python.

import logging
import gi
from threading import Thread, Lock
from time import sleep

from lib.utils import get_pad_info

gi.require_version('GstVideo', '1.0')
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')

# GstVideo is needed for set_window_handle(), dont't know why pycharm
removes it
from gi.repository import Gst, Gtk, Gdk, Pango, GstVideo, GObject


class GstWidget(Gtk.DrawingArea):
    def __init__(self, stream, parent, conf):
        super(GstWidget, self).__init__()
        self.stream = stream
        label_height = conf.label_height
        restore_pipeline_period = conf.restore_pipeline_period

        self.location = 'rtsp://{}:{}/{}'.format(stream.host, stream.port,
stream.path)

        self.log = logging.getLogger('visualizer')

        self.log.info('[{}] Initializing stream label of
{}'.format(stream.name, stream.name))
        self.label = Gtk.Label()
        stream_rectangle = stream.rectangle
        self.label.set_size_request(stream_rectangle.width, label_height)
        self.label.override_background_color(Gtk.StateType.NORMAL,
Gdk.RGBA(1.0, 1.0, 1.0, 1.0))
       
self.label.modify_font(Pango.font_description_from_string('font-size: 20'))
        parent.put(self.label, stream.rectangle.x, stream.rectangle.y +
stream_rectangle.height)
        self.log.info('[{}] Done label!'.format(stream.name))

        stream_rectangle = self.stream.rectangle
        self.set_size_request(stream_rectangle.width,
stream_rectangle.height)

        self.stream_connection_thread = None
        self.reconnecting = False
        self.lock = Lock()
        self.closed = False

        self.player = Gst.Pipeline.new('player')
        source = Gst.ElementFactory.make('rtspsrc', 'source')
        source.set_property('location', self.location)
        source.connect('pad-added', self.on_pad_added)
        jitter = Gst.ElementFactory.make('rtpjitterbuffer', 'jitter')
        depayer = Gst.ElementFactory.make('rtph264depay', 'depayer')
        decoder = Gst.ElementFactory.make('avdec_h264', 'decoder')
        converter = Gst.ElementFactory.make('videoconvert', 'converter')
        if conf.timers_overlay:
            timeoverlay = Gst.ElementFactory.make('timeoverlay',
'timeoverlay')
            clockoverlay = Gst.ElementFactory.make('clockoverlay',
'clockoverlay')
            clockoverlay.set_property('halignment', 'right')
        sink = Gst.ElementFactory.make('autovideosink', 'sink')
        sink.set_property('sync', False)

        self.player.add(source)
        self.player.add(jitter)
        self.player.add(depayer)
        self.player.add(decoder)
        self.player.add(converter)
        if conf.timers_overlay:
            self.player.add(timeoverlay)
            self.player.add(clockoverlay)
        self.player.add(sink)

        source.link(jitter)
        jitter.link(depayer)
        depayer.link(decoder)
        if conf.timers_overlay:
            decoder.link(timeoverlay)
            timeoverlay.link(clockoverlay)
            clockoverlay.link(converter)
        else:
            decoder.link(converter)
        converter.link(sink)

        self.bus = self.player.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect('message::warning', self.on_warning)
        self.bus.connect('message::error', self.on_error)
        self.bus.connect('message::state-changed', self.on_status_changed)
        self.bus.connect('message::eos', self.on_eos)
        self.bus.enable_sync_message_emission()
        self.bus.connect('sync-message::element', self.set_frame_handle)
        self.restore_pipeline()
        if conf.restore_pipeline_period != 0:
            GObject.timeout_add_seconds(restore_pipeline_period * 60,
self.restore_pipeline)

    def set_online_label(self):
        self.log.info('[{}] {} is online'.format(self.stream.name,
self.stream.name))
        self.label.set_text(self.stream.name)

    def set_offline_label(self):
        self.log.info('[{}] {} is offline'.format(self.stream.name,
self.stream.name))
        self.label.set_text('{} is offline'.format(self.stream.name))

    def on_status_changed(self, bus, message):
        msg = message.parse_state_changed()
        self.log.info('[{}] status_changed message ->
{}'.format(self.stream.name, msg))
        if msg.newstate == Gst.State.PLAYING:
            self.set_online_label()
        else:
            self.set_offline_label()

    def send_eos(self):
        self.log.debug('[{}] Sending eos'.format(self.stream.name))
        self.player.send_event(Gst.Event.new_eos())

    def on_eos(self, bus, message):
        self.log.warning('[{}] eos message -> {}'.format(self.stream.name,
message))
        self.restore_pipeline()

    def on_warning(self, bus, message):
        self.log.warning('[{}] warning message ->
{}'.format(self.stream.name, message.parse_warning()))
        self.restore_pipeline()

    def on_error(self, bus, message):
        self.log.error('[{}] error message -> {}'.format(self.stream.name,
message.parse_error()))
        self.restore_pipeline()

    def activate_reconnection(self):
        connection_thread = self.stream_connection_thread
        self.log.warning('[{}] connection thread {}, player.current_state
{}'.format(self.stream.name, connection_thread, self.player.current_state))
        if self.player.current_state == Gst.State.NULL and \
                (connection_thread is None or (connection_thread and not
connection_thread.is_alive())):
            self.stream_connection_thread =
Thread(target=self.reconnect_stream)
            self.stream_connection_thread.start()
        self.log.warning('[{}] I supposedly lost the
connection!'.format(self.stream.name))

    def reconnect_stream(self):
        self.log.info('[{}] reconnecting the
stream!'.format(self.stream.name))
        while self.player.current_state != Gst.State.PLAYING:
            # Applied De Morgan rules !playing && !closed => !(playing or
closed)
            self.log.info('[{}] I\'m trying to
reconnect!'.format(self.stream.name))
            self.play()
            sleep(self.stream.sleep_time)
            self.log.info('[{}] Just woke up'.format(self.stream.name))

    def stop(self):
        self.log.info('[{}] Current state of my pipeline is
{}'.format(self.stream.name, self.player.current_state))
        self.log.info('[{}] setting pipeline state to
null'.format(self.stream.name))
        self.set_player_state(Gst.State.NULL)

    def play(self):
        self.log.info('[{}] Current state of my pipeline is
{}'.format(self.stream.name, self.player.current_state))
        self.log.info('[{}] setting pipeline state to
playing'.format(self.stream.name))
        outcome = self.set_player_state(Gst.State.PLAYING)
        if outcome == Gst.StateChangeReturn.ASYNC:
            self.log.warning('[{}] Got ASYNC state change in
return'.format(self.stream.name))

    def restore_pipeline(self):
        self.log.info('[{}] asking for the lock'.format(self.stream.name))
        self.lock.acquire()
        if not self.reconnecting:
            self.reconnecting = True
        else:
            self.lock.release()
            self.log.info('[{}] released the lock'.format(self.stream.name))
            return True
        self.lock.release()
        self.log.info('[{}] released the lock'.format(self.stream.name))

        while self.player.current_state != Gst.State.NULL:
            self.stop()
        self.set_offline_label()

        source = self.player.get_child_by_name('source')
        jitter = self.player.get_child_by_name('jitter')
        ret = source.unlink(jitter)
        self.log.info('[{}] unlinking source to jitter got me
{}'.format(self.stream.name, ret))

        self.activate_reconnection()

        self.log.info('[{}] asking for the lock'.format(self.stream.name))
        self.lock.acquire()
        self.reconnecting = False
        self.lock.release()
        self.log.info('[{}] released the lock'.format(self.stream.name))
        return True

    def on_pad_added(self, element, pad):
        self.log.warning('[{}] pad-added to {}'.format(self.stream.name,
pad.name))
        jitter = self.player.get_child_by_name('jitter')
        jitter_pad = jitter.get_static_pad('sink')
        retval = pad.link(jitter_pad)
        self.log.warning('[{}] linked pad got me
{}'.format(self.stream.name, retval))

    def set_player_state(self, state):
        outcome = self.player.set_state(state)
        self.log.info('[{}] Called status change to {} got me
{}'.format(self.stream.name, state, outcome))
        return outcome

    def close(self):
        self.closed = True
        self.player.set_state(Gst.State.NULL)

    def status(self):
        response = {
            'player_state': self.player.current_state.value_nick,
            'label': self.label.get_text()
        }

        recon_thread = self.stream_connection_thread

        if recon_thread:
            response['reconnection_thread'] = {
                'alive': recon_thread.is_alive(),
                'name': recon_thread.name,
                'daemon': recon_thread.daemon
            }

        pipeline = self.player

        response['pipeline'] = {
            'pads': {
                pad.name: get_pad_info(pad) for pad in pipeline.pads
            }
        }

        clock = pipeline.clock
        if clock:
            response['pipeline']['clock'] = {
                'time': clock.get_time(),
                'resolution': clock.get_resolution(),
                'timeout': clock.get_timeout(),
                'internal time': clock.get_internal_time(),
                'floating': clock.is_floating(),
                'synced': clock.is_synced(),
                'name': clock.get_name(),
                'calibration': clock.get_calibration()
            }

        response['elements'] = [
            {
                'name': child.name,
                'state': child.current_state.value_nick,
                'flags': child.flags,
                'pads': {
                    pad.name: get_pad_info(pad) for pad in child.pads
                }
            }
            for child in pipeline.children]

        return response

    def set_frame_handle(self, bus, message):
        if message.get_structure().get_name() == 'prepare-window-handle':
            frame = message.src
            frame.set_property('force-aspect-ratio', True)
            Gdk.threads_enter()
            frame.set_window_handle(self.get_window().get_xid())
            Gdk.threads_leave()





--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel