appsrc + decodebin

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

appsrc + decodebin

baldman88
Hi to everyone. Sorry for my English. I have tried to find answer to my question, but without any results. I need to decode mp3 data to raw PCM data. I tried to change example for stream from github, but made something wrong. I need to get data from external buffer (on the start the size of data in stream is unknown, but for test I read chunks of data from the file), next decode it with decodebin, and finaly push decoded RAW data to appsink. But for begin I tried to push RAW data to alsasink (it is much more easy to test). The problem is that instead of sound i hear some noise (something like a fast forward). It looks like that the every new buffer, readed on "need-data" signal, starts play imediately. I have tried to change different settings of appsrc and decodebin, but it have no any effect. What i do wrong?
Here is the code:

#include <fstream>
 
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>
 
 
#define BUF_SIZE 8192
 
std::ifstream inFile("in2.mp3", std::fstream::binary);
 
 
static void onNewPad(GstElement *decodebin, GstPad *pad, GstElement *userData)
{
    GstPad *newpad;
    newpad = gst_element_get_static_pad(GST_ELEMENT(userData), "sink");
    gst_pad_link(pad, newpad);
    g_object_unref(newpad);
}
 
 
static void onNeedData(GstAppSrc *appsrc, guint dataSize, GstElement *userData)
{
    char data[BUF_SIZE];
    inFile.read(data, BUF_SIZE);
    guint realSize = inFile.gcount();
    if (realSize  > 0)
    {
        GstBuffer *buffer;
        buffer = gst_buffer_new_and_alloc(realSize);
        gst_buffer_set_data(buffer, (guint8 *) data, realSize);
        gst_app_src_push_buffer(appsrc, buffer);
    }
    else
    {
        gst_app_src_end_of_stream(appsrc);
    }
}
 
 
static gboolean bus_call(GstBus *bus, GstMessage *msg, gpointer data)
{
    GMainLoop *loop = (GMainLoop *) data;
    switch(GST_MESSAGE_TYPE(msg))
    {
        case GST_MESSAGE_EOS:
        {
            g_main_loop_quit(loop);
            break;
        }
        case GST_MESSAGE_ERROR:
        {
            gchar *debug;
            GError *error;
            gst_message_parse_error(msg, &error, &debug);
            g_free(debug);
            g_printerr("Error: %s\n", error->message);
            g_error_free(error);
            g_main_loop_quit(loop);
            break;
        }
        default:
        {
            break;
        }
    }
    return TRUE;
}
 
 
int main(int argc, char **argv)
{
    gst_init(NULL, NULL);
    GMainLoop *loop;
    GstElement *appsrc;
    GstElement *alsasink;
    GstElement *decodebin;
    GstElement *pipeline;
 
    loop = g_main_loop_new(NULL, FALSE);
    appsrc = gst_element_factory_make("appsrc", "appsrc");
    alsasink = gst_element_factory_make("alsasink", "alsasink");
    decodebin = gst_element_factory_make("decodebin2", "decodebin");
    pipeline = gst_pipeline_new("pipeline");
 
    gst_bin_add_many(GST_BIN(pipeline), appsrc, decodebin, alsasink, NULL);
    gst_element_link_many(appsrc, decodebin, NULL);
 
    g_object_set(G_OBJECT(appsrc), "stream-type", GST_APP_STREAM_TYPE_STREAM, "format", GST_FORMAT_BYTES, "do-timestamp", FALSE, "is-live", TRUE, "min-percent", 20, "block", FALSE, "max-bytes", 1000000, NULL);
 
    g_object_set(G_OBJECT(decodebin), "high-percent", 90, "low-percent", 10, "max-size-buffers", 0, "max-size-time", 0, "max-size-bytes", 0, "use-buffering", TRUE, NULL);
 
    g_signal_connect(G_OBJECT(decodebin), "pad-added", G_CALLBACK(onNewPad), alsasink);
 
    g_signal_connect(G_OBJECT(appsrc), "need-data", G_CALLBACK(onNeedData), appsrc);
 
    GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
    gst_bus_add_watch (bus, bus_call, loop);
    gst_object_unref(bus);
 
    gst_element_set_state(pipeline, GST_STATE_PLAYING);
    g_main_loop_run(loop);
    gst_element_set_state(pipeline, GST_STATE_NULL);
    g_main_loop_quit(loop);
    return 0;
}

Best regards.
Reply | Threaded
Open this post in threaded view
|

Re: appsrc + decodebin

dgoiko



baldman88 wrote

> Hi to everyone. Sorry for my English. I have tried to find answer to my
> question, but without any results. I need to decode mp3 data to raw PCM
> data. I tried to change example for stream from github, but made something
> wrong. I need to get data from external buffer (on the start the size of
> data in stream is unknown, but for test I read chunks of data from the
> file), next decode it with decodebin, and finaly push decoded RAW data to
> appsink. But for begin I tried to push RAW data to alsasink (it is much
> more easy to test). The problem is that instead of sound i hear some noise
> (something like a fast forward). It looks like that the every new buffer,
> readed on "need-data" signal, starts play imediately. I have tried to
> change different settings of appsrc and decodebin, but it have no any
> effect. What i do wrong?

Hi there. You need source-setup and on_source_need_data. In my case, I've a
glitch in my code: source emits that signal BEFORE I've opened  my own
stream from the outside. In order to avoid such problems, Imade a small loop
inside. This should be done only while testing, and a more robust solution
shall be looked for.

No idea about decodebin, but this should be similar to uridecodebin. This
script works. Gets data from appsrc and feeds it to autoaudiosink and
autovideosink: Be carefull with pads! I'm not sure if pads are allways added
in the same order, it should be wiser to compare against caps yo see if
they're audio or video, and then link with your stuff.

This is a very basic draft I just made out of my development state app, if
there's something anyone would improve, i'm happy to listen: I'm new to
gstreamer.

class PlayerUriDecoodeBin:
    def __init__(self):
        # SSocket abierto
        self.fd = None
        self.pipeline = None
        self.binRecord = None
        self.binTranscripcion = None
        self.binDebug = None

        # Pipeline. Es el corazón del sistema. Gestiona punteros, tiempos,
flujos... Worship the pipeline!
        self.pipeline = Gst.Pipeline()
        # Conectamos las funciones de alimentación de datos al Pipeline.
        # Obtenemos el bus principal de la pipeline y le añadimos signal
handlers.
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect('message::eos', self.on_eos)
        self.bus.connect('message::tag', self.on_tag)
        self.bus.connect('message::error', self.on_error)

        # Create elements
        self.srcdec = Gst.ElementFactory.make('uridecodebin')
        self.srcdec.connect("source-setup", self.on_source_setup)
        # self.conv = Gst.ElementFactory.make('audioconvert')
        # self.rsmpl = Gst.ElementFactory.make('audioresample')
        # self.sink = Gst.ElementFactory.make('alsasink')
        self.videosink = Gst.ElementFactory.make('autovideosink')
        self.audiosink = Gst.ElementFactory.make('autoaudiosink')

        # Set 'location' property on filesrc
        self.srcdec.set_property('uri', "appsrc://")

        # Connect handler for 'pad-added' signal
        self.srcdec.connect('pad-added', self.on_pad_added)

        # Add elements to pipeline
        self.pipeline.add(self.srcdec)
        # self.pipeline.add(self.conv)
        # self.pipeline.add(self.rsmpl)
        self.pipeline.add(self.videosink)
        self.pipeline.add(self.audiosink)
        # Link *some* elements
        # This is completed in self.on_new_decoded_pad()
        # self.conv.link(self.rsmpl)
        # self.rsmpl.link(self.sink)
        # Gst.element_link_many(self.conv, self.rsmpl, self.sink)

        # Reference used in self.on_new_decoded_pad()
        # self.apad = self.conv.get_pad('sink')
        self.apad_audio = self.audiosink.get_static_pad('sink')
        self.apad_video = self.videosink.get_static_pad('sink')

        # And off we go!
        self.pipeline.set_state(Gst.State.PLAYING)

    def on_pad_added(self, element, pad):
        print('on_pad_added:', pad)
        print('target: ' + pad.get_name())
        if pad.get_name() == "src_0":
            if not self.apad_video.is_linked():  # Linkar solo una vez
                pad.link(self.apad_video)
        if pad.get_name() == "src_1":
            if not self.apad_audio.is_linked():  # Linkas solo una vez
                pad.link(self.apad_audio)

    def on_source_setup(self, element, source):
        # Una vez lista, configurar la fuente para que cuando necesite datos
        # ejecute el callback self.on_source_need_data
        print(source)
        source.connect("need-data", self.on_source_need_data)

    def habilitar_lectura_buffer(self):
        self.srcdec.connect("source-setup", self.on_source_setup)
        ##self.bus.connect("need-data", self.on_source_need_data)

    def on_source_need_data(self, source, length):
        # La fuente de datos "source" solicita "length" bytes de datos.
        # Intentar leer del stream
        while self.fd is None:
            # Si no hay un stream listo, pausar esto. OJO! bucle bloqueante,
aunque no deberia
            # ser un problema porque esto lo lleva gstreamer en hilos
separados.
            time.sleep(1)
        try:
            data = self.fd.read(length)
            if not data:
                print("EOS")
                source.emit("end-of-stream")
                return
            # Convertir los bytes Python en bytes aceptables para Gstreammer
            # y empujarlos al appsrc del playbin
            # TODO: Revisar si esto puede mejorarse, tiene que ser "caro".
            buf = Gst.Buffer.new_wrapped(data)
            source.emit("push-buffer", buf)
        except IOError as err:
            print("Error leyendo datos del stream stream: {0}".format(err))
            self.exit("Error leyendo datos del stream stream:
{0}".format(err))
        except AttributeError as err:
            print("Error leyendo datos del stream stream: {0}".format(err))
            self.exit("Error leyendo datos del stream stream:
{0}".format(err))

    def on_eos(self, bus, msg):
        print('on_eos')
        self.pipeline.set_state(Gst.State.NULL)

    def on_tag(self, bus, msg):
        taglist = msg.parse_tag()
        print('on_tag:')
        for key in taglist.keys():
            print('\t%s = %s' % (key, taglist[key]))

    def on_error(self, bus, msg):
        error = msg.parse_error()
        print('on_error:', error[1])

    def exit(self, msg):
        self.stop_y_close_buffer()
        print(msg)
        exit(msg)

    def stop(self):
        # Stop playback and exit mainloop
        self.pipeline.set_state(Gst.State.NULL)

    def stop_y_close_buffer(self):
        # Stop playback and exit mainloop
        self.stop()
        # Close the stream
        if self.fd:
            self.fd.close()
            self.fd = None

    def set_stream(self, stream):
        # Attempt to open the stream
        if self.fd is not None:
            try:
                self.stop_y_close_buffer()
            except Exception as err:
                self.exit("Error cerrando el stream antiguo
{0}".format(err))
        try:
            print("Abriendo Stream")
            self.fd = stream.open()
        except StreamError as err:
            self.exit("Failed to open stream: {0}".format(err))

    def play(self):
        self.pipeline.set_state(Gst.State.PLAYING)
        # self.mainloop.run()

    def pausar_todo(self):
        self.pipeline.set_state(Gst.State.PAUSED)

    def reanudar_todo(self):
        self.pipeline.set_state(Gst.State.PLAYING)



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel