How to add fault tolerance for multiple pulsesinks

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to add fault tolerance for multiple pulsesinks

Tal Liron-2

I'm writing a server that broadcasts to multiple PulseAudio sinks over the network.

It works great when it works. :) Except that it does not tolerate network failure nicely: it's enough for one of the sinks to go down, and the stream stops for *all* sinks. In fact, the pipeline won't even start playing if one of the sinks cannot connect.

The pipeline looks like this:

playbin2 ! tee ! valve ! queue ! pulsesink

Where the "! valve ! queue ! pulsesink" branch is duplicated for each sink, and in each branch is contained in bin. (I need the valve in order to allow for safe removal these branches while playing.)

The easiest way to test this is to create a pipeline with two such branches, where one branch points to a network host that doesn't exist. The pipeline won't play.

Any ideas on how to proceed? I don't even know which element is the stubborn one here. I just want the pipeline to ignore the "bad" sink and work around it!

-Tal



_______________________________________________
gstreamer-devel mailing list
[hidden email]
http://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
Reply | Threaded
Open this post in threaded view
|

Re: How to add fault tolerance for multiple pulsesinks

Tal Liron

On 09/02/2011 04:02 AM, Tal Liron wrote:

Any ideas on how to proceed? I don't even know which element is the stubborn one here. I just want the pipeline to ignore the "bad" sink and work around it!


Here's some more information. When I try to start the pipeline with that one bad sink, I see these warnings (GST_DEBUG=2):


0:00:02.157511212  8617      0x1dca6b0 WARN                   pulse pulsesink.c:540:gst_pulseringbuffer_open_device:<sink:pulseaudio:mauss2:> error: Failed to connect: Connection refused
0:00:02.158062292  8617      0x1dca6b0 WARN                playsink gstplaysink.c:1869:gen_audio_chain:<playsink0> error: Configured audiosink bin0 is not working.
0:00:02.158876109  8617      0x1dca6b0 WARN                 flacdec gstflacdec.c:1264:gst_flac_dec_sink_event:<flacdec0> couldn't convert time => samples


(Host "mauss2" is indeed turned off.)


Then, I also get this error message from the bus:


GStreamer error: state change failed and some element failed to post a proper error message with the reason for the failure.
gstplaysink.c(1869): gen_audio_chain (): /GstPlayBin2:Player/GstPlaySink:playsink0


Can this help anyone point me to the right direction?


-Tal


_______________________________________________
gstreamer-devel mailing list
[hidden email]
http://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
Reply | Threaded
Open this post in threaded view
|

Re: How to add fault tolerance for multiple pulsesinks

Tal Liron-2
In reply to this post by Tal Liron-2
OK! I've solved most of my issues, and it was not easy. I'm posting this so that others can learn from my (mis)adventures.

My original approach was essentially good. If you want to be able to add/remove sinks dynamically to a playing pipeline, you need something like this:

playbin2 ! tee ! valve ! queue2 ! ... ! sink

The whole "valve ! queue2 ! ... ! sink" is considered a branch, and you need a whole branch per sink. Note that all the branch elements and the tee are all in one "bin", and that I use a ghost pad to connect that bin to the playbin2 element.

Why the "queue2" elements? We need this because we use a "tee", but need every branch to have its own clock.

Why the "valve"? You'll see later when we need to remove the branch.

First off, before you add any real branch, you want to add one fake one: "queue2 ! fakesink sync=true". Note that we didn't use a valve here, because we're never going to remove this branch. Also note "sync=true", otherwise the fakesink will swallow the whole stream in less than second. This branch guarantees that the pipeline will keep going at the right pace even if no-one is connected.

To add a branch, you need to make sure you're in the glib main loop thread. I did this in Python using gst.idle_add(callback). Make sure to return False from your callback! I found out the hard way that if you don't, your callback will *repeatedly execute*.

First, build all your branch elements and them to the bin. Then, link them together after the tee, and sync the state in order. From my code:

gst.element_link_many(self.tee, valve, queue, sink)
valve.sync_state_with_parent()
queue.sync_state_with_parent()
sink.sync_state_with_parent()

That's it! Your branch should be playing now.

Removing a branch is much trickier. Again, make sure you are in the glib main loop thread. Then, turn off the valve, and turn :

valve.set_property('drop', True)

Now comes the tricky part. Before removing our branch elements from the bin, they need to be in state NULL. However, setting state to NULL can often be asynchronous, so you need a way to wait until the state is actually NULL. Here's how: we create a special array called "element_to_remove" which will keep track of them. Then, we ask the elements to stop like so:

self.elements_to_remove.append(valve)
valve.set_state(gst.STATE_NULL)
self.elements_to_remove.append(queue)
queue.set_state(gst.STATE_NULL)
self.elements_to_remove.append(sink)
sink.set_state(gst.STATE_NULL)

How do we know when they stop? Well, we set up the usual bus listener in the beggining:

bus = self.player.get_bus()
bus.add_signal_watch()
bus.connect('message', self._on_message)

Then, in  _on_message we can look for the state change and safely remove our elements:

if message.type == gst.MESSAGE_STATE_CHANGED:
    oldstate, newstate, pending = message.parse_state_changed()
    element = message.src
    if newstate == gst.STATE_NULL:
        remove = False
        try:
            self.elements_to_remove.remove(element)
            remove = True
        except ValueError:
            pass
        if remove:
            element.set_locked_state(True)
            self.bin.remove(element)
 
Note that we've also locked the element's state. That's not entirely necessary, but it's just a way to guarantee it doesn't change for some reason.

That's it! You can add/remove branches while the pipeline is playing.

The other part of my problem with adding fault-tolerance to the networking sinks. I could not manage this with PulseAudio, and would still love to hear suggestions on how. I'm imagining some kind of valve-like element to sit before the sink, and simulate a fakesink whenever the real sink is not behaving well. What I did instead was abandon PulseAudio as the transport, and use RTP on the server and on the clients. The client can then use its local PulseAudio sink. So, it's PulseAudio, just not as the transport protocol. RTP was quite difficult to set up, and is an adventure story for another day...

-Tal

_______________________________________________
gstreamer-devel mailing list
[hidden email]
http://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
Reply | Threaded
Open this post in threaded view
|

Re: How to add fault tolerance for multiple pulsesinks

Tal Liron
In reply to this post by Tal Liron-2

OK! I've solved most of my issues, and it was not easy. I'm posting this so that others can learn from my (mis)adventures.


My original approach was essentially good. If you want to be able to add/remove sinks dynamically to a playing pipeline, you need something like this:

playbin2 ! tee ! valve ! queue2 ! ... ! sink

The whole "valve ! queue2 ! ... ! sink" is considered a branch, and you need a whole branch per sink. Note that all the branch elements and the tee are all in one "bin", and that I use a ghost pad to connect that bin to the playbin2 element.

Why the "queue2" elements? We need this because we use a "tee", but need every branch to have its own clock.

Why the "valve"? You'll see later when we need to remove the branch.

First off, before you add any real branch, you want to add one fake one: "queue2 ! fakesink sync=true". Note that we didn't use a valve here, because we're never going to remove this branch. Also note "sync=true", otherwise the fakesink will swallow the whole stream in less than second. This branch guarantees that the pipeline will keep going at the right pace even if no-one is connected.

To add a branch, you need to make sure you're in the glib main loop thread. I did this in Python using gst.idle_add(callback). Make sure to return False from your callback! I found out the hard way that if you don't, your callback will *repeatedly execute*.

First, build all your branch elements and them to the bin. Then, link them together after the tee, and sync the state in order. From my code:

gst.element_link_many(self.tee, valve, queue, sink)
valve.sync_state_with_parent()
queue.sync_state_with_parent()
sink.sync_state_with_parent()

That's it! Your branch should be playing now.

Removing a branch is much trickier. Again, make sure you are in the glib main loop thread. Then, turn off the valve, and turn :

valve.set_property('drop', True)

Now comes the tricky part. Before removing our branch elements from the bin, they need to be in state NULL. However, setting state to NULL can often be asynchronous, so you need a way to wait until the state is actually NULL. Here's how: we create a special array called "element_to_remove" which will keep track of them. Then, we ask the elements to stop like so:

self.elements_to_remove.append(valve)
valve.set_state(gst.STATE_NULL)
self.elements_to_remove.append(queue)
queue.set_state(gst.STATE_NULL)
self.elements_to_remove.append(sink)
sink.set_state(gst.STATE_NULL)

How do we know when they stop? Well, we set up the usual bus listener in the beggining:

bus = self.player.get_bus()
bus.add_signal_watch()
bus.connect('message', self._on_message)

Then, in  _on_message we can look for the state change and safely remove our elements:

if message.type == gst.MESSAGE_STATE_CHANGED:
    oldstate, newstate, pending = message.parse_state_changed()
    element = message.src
    if newstate == gst.STATE_NULL:
        remove = False
        try:
            self.elements_to_remove.remove(element)
            remove = True
        except ValueError:
            pass
        if remove:
            element.set_locked_state(True)
            self.bin.remove(element)
 
Note that we've also locked the element's state. That's not entirely necessary, but it's just a way to guarantee it doesn't change for some reason.

That's it! You can add/remove branches while the pipeline is playing.

The other part of my problem with adding fault-tolerance to the networking sinks. I could not manage this with PulseAudio, and would still love to hear suggestions on how. I'm imagining some kind of valve-like element to sit before the sink, and simulate a fakesink whenever the real sink is not behaving well. What I did instead was abandon PulseAudio as the transport, and use RTP on the server and on the clients. The client can then use its local PulseAudio sink. So, it's PulseAudio, just not as the transport protocol. RTP was quite difficult to set up, and is an adventure story for another day...

-Tal


_______________________________________________
gstreamer-devel mailing list
[hidden email]
http://lists.freedesktop.org/mailman/listinfo/gstreamer-devel