I'm writing a server that broadcasts to multiple PulseAudio sinks over the network. It works great when it works. :) Except that it does not tolerate
network failure nicely: it's enough for one of the sinks to go
down, and the stream stops for *all* sinks. In fact, the pipeline
won't even start playing if one of the sinks cannot connect. The pipeline looks like this: playbin2 ! tee ! valve ! queue ! pulsesink Where the "! valve ! queue ! pulsesink" branch is duplicated for
each sink, and in each branch is contained in bin. (I need the
valve in order to allow for safe removal these branches while
playing.) The easiest way to test this is to create a pipeline with two such branches, where one branch points to a network host that doesn't exist. The pipeline won't play. Any ideas on how to proceed? I don't even know which element is
the stubborn one here. I just want the pipeline to ignore the
"bad" sink and work around it! -Tal
_______________________________________________ gstreamer-devel mailing list [hidden email] http://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
On 09/02/2011 04:02 AM, Tal Liron wrote:
0:00:02.157511212 8617 0x1dca6b0 WARN
pulse
pulsesink.c:540:gst_pulseringbuffer_open_device:<sink:pulseaudio:mauss2:>
error: Failed to connect: Connection refused
(Host "mauss2" is indeed turned off.)
Then, I also get this error message from the bus:
GStreamer error: state change failed and some element failed
to post a proper error message with the reason for the failure.
Can this help anyone point me to the right direction?
-Tal _______________________________________________ gstreamer-devel mailing list [hidden email] http://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
In reply to this post by Tal Liron-2
OK! I've solved most of my issues, and it was not easy. I'm posting
this so that others can learn from my (mis)adventures.
My original approach was essentially good. If you want to be able to add/remove sinks dynamically to a playing pipeline, you need something like this: playbin2 ! tee ! valve ! queue2 ! ... ! sink The whole "valve ! queue2 ! ... ! sink" is considered a branch, and you need a whole branch per sink. Note that all the branch elements and the tee are all in one "bin", and that I use a ghost pad to connect that bin to the playbin2 element. Why the "queue2" elements? We need this because we use a "tee", but need every branch to have its own clock. Why the "valve"? You'll see later when we need to remove the branch. First off, before you add any real branch, you want to add one fake one: "queue2 ! fakesink sync=true". Note that we didn't use a valve here, because we're never going to remove this branch. Also note "sync=true", otherwise the fakesink will swallow the whole stream in less than second. This branch guarantees that the pipeline will keep going at the right pace even if no-one is connected. To add a branch, you need to make sure you're in the glib main loop thread. I did this in Python using gst.idle_add(callback). Make sure to return False from your callback! I found out the hard way that if you don't, your callback will *repeatedly execute*. First, build all your branch elements and them to the bin. Then, link them together after the tee, and sync the state in order. From my code: gst.element_link_many(self.tee, valve, queue, sink) valve.sync_state_with_parent() queue.sync_state_with_parent() sink.sync_state_with_parent() That's it! Your branch should be playing now. Removing a branch is much trickier. Again, make sure you are in the glib main loop thread. Then, turn off the valve, and turn : valve.set_property('drop', True) Now comes the tricky part. Before removing our branch elements from the bin, they need to be in state NULL. However, setting state to NULL can often be asynchronous, so you need a way to wait until the state is actually NULL. Here's how: we create a special array called "element_to_remove" which will keep track of them. Then, we ask the elements to stop like so: self.elements_to_remove.append(valve) valve.set_state(gst.STATE_NULL) self.elements_to_remove.append(queue) queue.set_state(gst.STATE_NULL) self.elements_to_remove.append(sink) sink.set_state(gst.STATE_NULL) How do we know when they stop? Well, we set up the usual bus listener in the beggining: bus = self.player.get_bus() bus.add_signal_watch() bus.connect('message', self._on_message) Then, in _on_message we can look for the state change and safely remove our elements: if message.type == gst.MESSAGE_STATE_CHANGED: oldstate, newstate, pending = message.parse_state_changed() element = message.src if newstate == gst.STATE_NULL: remove = False try: self.elements_to_remove.remove(element) remove = True except ValueError: pass if remove: element.set_locked_state(True) self.bin.remove(element) Note that we've also locked the element's state. That's not entirely necessary, but it's just a way to guarantee it doesn't change for some reason. That's it! You can add/remove branches while the pipeline is playing. The other part of my problem with adding fault-tolerance to the networking sinks. I could not manage this with PulseAudio, and would still love to hear suggestions on how. I'm imagining some kind of valve-like element to sit before the sink, and simulate a fakesink whenever the real sink is not behaving well. What I did instead was abandon PulseAudio as the transport, and use RTP on the server and on the clients. The client can then use its local PulseAudio sink. So, it's PulseAudio, just not as the transport protocol. RTP was quite difficult to set up, and is an adventure story for another day... -Tal _______________________________________________ gstreamer-devel mailing list [hidden email] http://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
In reply to this post by Tal Liron-2
OK! I've solved most of my issues, and it was not easy. I'm posting this so that others can learn from my (mis)adventures. My original approach was essentially good. If you want to be able to add/remove sinks dynamically to a playing pipeline, you need something like this: playbin2 ! tee ! valve ! queue2 ! ... ! sink The whole "valve ! queue2 ! ... ! sink" is considered a branch, and you need a whole branch per sink. Note that all the branch elements and the tee are all in one "bin", and that I use a ghost pad to connect that bin to the playbin2 element. Why the "queue2" elements? We need this because we use a "tee", but need every branch to have its own clock. Why the "valve"? You'll see later when we need to remove the branch. First off, before you add any real branch, you want to add one fake one: "queue2 ! fakesink sync=true". Note that we didn't use a valve here, because we're never going to remove this branch. Also note "sync=true", otherwise the fakesink will swallow the whole stream in less than second. This branch guarantees that the pipeline will keep going at the right pace even if no-one is connected. To add a branch, you need to make sure you're in the glib main loop thread. I did this in Python using gst.idle_add(callback). Make sure to return False from your callback! I found out the hard way that if you don't, your callback will *repeatedly execute*. First, build all your branch elements and them to the bin. Then, link them together after the tee, and sync the state in order. From my code: gst.element_link_many(self.tee, valve, queue, sink) valve.sync_state_with_parent() queue.sync_state_with_parent() sink.sync_state_with_parent() That's it! Your branch should be playing now. Removing a branch is much trickier. Again, make sure you are in the glib main loop thread. Then, turn off the valve, and turn : valve.set_property('drop', True) Now comes the tricky part. Before removing our branch elements from the bin, they need to be in state NULL. However, setting state to NULL can often be asynchronous, so you need a way to wait until the state is actually NULL. Here's how: we create a special array called "element_to_remove" which will keep track of them. Then, we ask the elements to stop like so: self.elements_to_remove.append(valve) valve.set_state(gst.STATE_NULL) self.elements_to_remove.append(queue) queue.set_state(gst.STATE_NULL) self.elements_to_remove.append(sink) sink.set_state(gst.STATE_NULL) How do we know when they stop? Well, we set up the usual bus listener in the beggining: bus = self.player.get_bus() bus.add_signal_watch() bus.connect('message', self._on_message) Then, in _on_message we can look for the state change and safely remove our elements: if message.type == gst.MESSAGE_STATE_CHANGED: oldstate, newstate, pending = message.parse_state_changed() element = message.src if newstate == gst.STATE_NULL: remove = False try: self.elements_to_remove.remove(element) remove = True except ValueError: pass if remove: element.set_locked_state(True) self.bin.remove(element) Note that we've also locked the element's state. That's not entirely necessary, but it's just a way to guarantee it doesn't change for some reason. That's it! You can add/remove branches while the pipeline is playing. The other part of my problem with adding fault-tolerance to the networking sinks. I could not manage this with PulseAudio, and would still love to hear suggestions on how. I'm imagining some kind of valve-like element to sit before the sink, and simulate a fakesink whenever the real sink is not behaving well. What I did instead was abandon PulseAudio as the transport, and use RTP on the server and on the clients. The client can then use its local PulseAudio sink. So, it's PulseAudio, just not as the transport protocol. RTP was quite difficult to set up, and is an adventure story for another day... -Tal _______________________________________________ gstreamer-devel mailing list [hidden email] http://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
Free forum by Nabble | Edit this page |