Branching pipeline burning memory when restarting

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Branching pipeline burning memory when restarting

Jesper Taxbøl
Hi Guys,

I am working on an application that is supposed to record video from a RTSP camera alongside audio from a local ALSA source into a MP4 container.

I know the MP4 container need an EOS to finalize properly so I send EOS signal through the pipeline (both alsasrc and rtspsrc) and await an EOS recieved event before setting state GST_STATE_NULL.


I interact with the program by sending "start" or "stop" to the gst.fifo file handle.

This works fine and I get nice MP4 files, except that when I want to record a new file I see an ever increased memory consumption giving my application a very limited lifespan. 

I assume its because audio and video is not in sync and some buffering occur. 

Pointers on how I should deal with this problem are very welcome.

kind regards

Jesper


---------------------------

#include <gst/gst.h>
#include <gst/base/gstbaseparse.h>
#include <stdio.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <dirent.h>
#include <sqlite3.h>
#include <sys/statvfs.h>
#include <sys/stat.h> 
#include <sys/types.h> 
#include <pthread.h> 

#define RTSP_CAMERA_L "rtsp://192.168.131.176:1001/"
#define RTSP_CAMERA_R "rtsp://192.168.131.176:1002/"
#define BASE_PATH ""

static int current_recording_starttime = 0;

int recording = 0;



//######################################################################
//#
//# Datastructure keeping references to pipeline elements
//#
//######################################################################

typedef struct _CustomData {
    GstElement *pipeline;

    GstElement *rtspsrc_l;
    GstElement *rtph264depay_l;
    GstElement *h264parse_l;
    GstElement *mp4mux_l;
    GstElement *filesink_l;
    
    GstElement *alsasrc;
    GstElement *audioconvert;
    GstElement *faac;
    GstElement *audio_tee;
    GstElement *queue_l;

    GMainLoop *loop;
    GstBus *bus;
} CustomData;
CustomData data;


//######################################################################
//#
//# Bus callback
//#
//######################################################################

int bus_callback(GstBus *bus, GstMessage *msg, gpointer d) {
    GError *err;
    gchar *debug_info;

    switch (GST_MESSAGE_TYPE(msg)) {
        case GST_MESSAGE_ERROR:
            gst_message_parse_error(msg, &err, &debug_info);
            g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
            g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
            g_clear_error(&err);
            g_free(debug_info);
            //g_main_loop_quit(data.loop);
            break;
        case GST_MESSAGE_EOS:
            g_print("End-Of-Stream reached.\n");
            if(gst_element_set_state(data.pipeline, GST_STATE_NULL) == GST_STATE_CHANGE_FAILURE) printf("ERROR A\r\n");
    
            break;
        case GST_MESSAGE_STATE_CHANGED:
            // We are only interested in state-changed messages from the pipeline 
            if (GST_MESSAGE_SRC(msg) == GST_OBJECT(data.pipeline)) {
                GstState old_state, new_state, pending_state;
                gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
                g_print("Pipeline state changed from %s to %s:\n",
                        gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));
                if(new_state == GST_STATE_PAUSED && old_state == GST_STATE_PLAYING)
                {
                    //printf("AAAAAAAAAAAAAAAAAAAA\r\n");
                    
                }


            } else {
                GstState old_state, new_state, pending_state;
                gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
                //g_print("Element %s state changed from %s to %s:\n", GST_OBJECT_NAME(GST_MESSAGE_SRC(msg)),
                //        gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));
            }

            break;
        case GST_MESSAGE_PROGRESS:
            {
                //g_print("Progress notify\r\n");
                GstProgressType type;
                gchar *code, *text;
                gst_message_parse_progress (msg, &type, &code, &text);
                switch (type) {
                    case GST_PROGRESS_TYPE_START:
                        //printf("GST_PROGRESS_TYPE_START\r\n");
                        break;
                    case GST_PROGRESS_TYPE_CONTINUE:
                        //printf("GST_PROGRESS_TYPE_CONTINUE\r\n");
                        break;
                    case GST_PROGRESS_TYPE_COMPLETE:
                        //printf("PROGRESS_TYPE_COMPLETE\r\n");
                        break;
                    case GST_PROGRESS_TYPE_CANCELED:
                        printf("GST_PROGRESS_TYPE_CANCELED\r\n");
                        break;
                    case GST_PROGRESS_TYPE_ERROR:
                        printf("GST_PROGRESS_TYPE_ERROR\r\n");
                        break;
                    default:
                        printf("DEFAULT\r\n");
                        break;
                }
                //printf("Progress: (%s) %s\n", code, text);
                g_free (code);
                g_free (text);
            }
            break;
        case GST_MESSAGE_NEW_CLOCK:
            {
                GstClock *clock;
                gst_message_parse_new_clock (msg, &clock);
                printf("New clock: %s\n", (clock ? GST_OBJECT_NAME (clock) : "NULL"));
                
            }
            break;
        case GST_MESSAGE_STREAM_START:
            {
                guint group_id;
                gst_message_parse_group_id (msg, &group_id);    
                printf("Stream start group id: %d\n", group_id);
            }
            break;
        case GST_MESSAGE_ASYNC_DONE:
            {
                GstClockTime t;
                gst_message_parse_async_done(msg, &t);
                printf("Async done: %u ns\n", t);
            }
            break;
        case GST_MESSAGE_STREAM_STATUS:
            {
                //GstClockTime t;
                //gst_message_parse_stream_status(msg, &t);
                //printf("Stream status\n");
            }
            break;
        case GST_MESSAGE_LATENCY:
            {
                //GstClockTime t;
                //gst_message_parse_stream_status(msg, &t);
                printf("Latency\n");
            }
            break;
            
        default:
            // We should not reach here 
            g_printerr("Unexpected message received. Type: %s\n", GST_MESSAGE_TYPE_NAME(msg));
            
            break;
    }
    return TRUE;
}




//######################################################################
//#
//# Callback for rtsp sources to connect to pipeline
//#
//######################################################################


static void pad_added_handler(GstElement *src, GstPad *new_pad, CustomData* data) 
{
    GstPad *sink_pad;

    sink_pad = gst_element_get_static_pad(data->rtph264depay_l, "sink");
    
    if (gst_pad_is_linked (sink_pad)) 
    {
        g_print ("We are already linked. Ignoring.\n");
        goto exit;
    }

    //g_print("Received new pad '%s' from '%s':\n", GST_PAD_NAME(new_pad), GST_ELEMENT_NAME(src));

    GstCaps *new_pad_caps = gst_pad_get_current_caps(new_pad);
    GstStructure *new_pad_struct = gst_caps_get_structure(new_pad_caps, 0);
    const gchar *new_pad_type = gst_structure_get_name(new_pad_struct);
    
    if (g_str_has_prefix(new_pad_type, "application/x-rtp")) {
        GstPadLinkReturn ret = gst_pad_link (new_pad, sink_pad);
        if (GST_PAD_LINK_FAILED(ret)) {
            g_print("Type is '%s' but link failed.\n", new_pad_type);
        } else {
            g_print("Link succeeded (type '%s') from %s to %s.\n", new_pad_type, GST_PAD_NAME(new_pad), GST_PAD_NAME(sink_pad));
        }
        return;
    }    

exit:
    //Unreference the new pad's caps, if we got them
    if (new_pad_caps != NULL)
    {
        gst_caps_unref(new_pad_caps);
    }
    //Unreference the sink pad
    gst_object_unref(sink_pad);
}

//######################################################################
//#
//# GST_Init
//#
//######################################################################

int initialize()
{
    //Create base pipeline
    char buffer[100];
 
    data.pipeline = gst_pipeline_new("veo-pipeline");
    if (data.pipeline == NULL)
    {
        printf("Pipeline could not be created.");
        return -1;
    }

    //Create all elements - RIGHT
    data.rtspsrc_l = gst_element_factory_make("rtspsrc", "rtspsrc_l");
    data.rtph264depay_l = gst_element_factory_make("rtph264depay", "rtph264depay_l");
    data.h264parse_l = gst_element_factory_make("h264parse", "h264parse_l");
    data.mp4mux_l = gst_element_factory_make("qtmux", "mp4mux_l");
    data.filesink_l = gst_element_factory_make("filesink", "filesink_l");
    
    data.alsasrc = gst_element_factory_make("alsasrc", "alsasrc");
    data.audioconvert = gst_element_factory_make("audioconvert", "audioconvert");
    data.faac = gst_element_factory_make("faac", "faac");
    data.audio_tee = gst_element_factory_make("tee", "audio_tee");
    data.queue_l = gst_element_factory_make("queue", "queue_l");
    

    if ( !data.rtspsrc_l || !data.rtph264depay_l || !data.h264parse_l || 
         !data.mp4mux_l || !data.filesink_l)
    {
        g_printerr("Not all RIGHT elements could be created.\n");
        return -1;
    }


    if ( !data.alsasrc || !data.audioconvert || !data.faac || !data.audio_tee || 
         !data.queue_l)
    {
        g_printerr("Not all AUDIO elements could be created.\n");
        return -1;
    }

    gst_bin_add_many(GST_BIN(data.pipeline),
        data.rtspsrc_l,
        data.rtph264depay_l,
        data.h264parse_l,
        data.mp4mux_l,
        data.filesink_l, NULL);
    
    gst_bin_add_many(GST_BIN(data.pipeline),
        data.alsasrc,
        data.audioconvert,
        data.faac,
        data.audio_tee, 
        data.queue_l, 
        NULL);

    
    //Set parameters
    gst_base_parse_set_pts_interpolation ((GstBaseParse *)(data.h264parse_l),TRUE);
    
    g_object_set(data.alsasrc, "buffer-time", 5 * 1000000, NULL);//5 second buffer


    sprintf(buffer, "%ssmall", RTSP_CAMERA_L);
    g_object_set(data.rtspsrc_l,
        "location", buffer,
        "ntp-sync", TRUE,
        "protocols", (1 << 2), //GST_RTSP_LOWER_TRANS_TCP
        "do-rtsp-keep-alive", FALSE,
        "debug", TRUE,
            NULL);
    

    // Listen to the bus 
    data.bus = gst_element_get_bus(data.pipeline);
    gst_bus_add_watch(data.bus, bus_callback, &data);
    
    //Setup callback to connect rtsp pads to other modules
    g_signal_connect(data.rtspsrc_l, "pad-added", G_CALLBACK(pad_added_handler), &(data));
    

    gst_element_link(data.rtph264depay_l, data.h264parse_l);
    gst_element_link(data.h264parse_l, data.mp4mux_l);
    gst_element_link(data.mp4mux_l, data.filesink_l);
    

    gst_element_link(data.alsasrc, data.audioconvert);
    gst_element_link(data.audioconvert, data.faac);
    gst_element_link(data.faac, data.audio_tee);
    gst_element_link(data.audio_tee, data.queue_l);
    gst_element_link(data.queue_l, data.mp4mux_l);


}



//######################################################################
//#
//# Start recording
//#
//######################################################################



int start() 
{
    char buffer[100];

    printf("#######################################################\r\n");
    printf("Start\r\n");
    printf("#######################################################\r\n");

    if(recording != 0)
    {
        printf("Cant start recording when already recording\r\n");
        return -1;
    }
  
    current_recording_starttime ++;
    
    //Set filesink location names
    sprintf(buffer, "%s%d_l.mp4", BASE_PATH, current_recording_starttime);
    g_object_set(data.filesink_l, "location", buffer, NULL);

    //Start pipeline
    GstPadLinkReturn ret;
    ret = gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
    if (ret == GST_STATE_CHANGE_FAILURE) {
        g_printerr("Cant set ready.\n");
        gst_object_unref(data.pipeline);
        return -1;
    }    
    recording = 1;

    return 0;
}

//######################################################################
//#
//# Stop recording 
//#
//######################################################################

int stop() {
    printf("#######################################################\r\n");
    printf("Stop\r\n");
    printf("#######################################################\r\n");
    
    
    if (recording == 0)
    {
        printf("Pipeline must be recording for us to stop it\r\n");
        return -1;
    }
    

    gst_element_send_event(data.rtspsrc_l, gst_event_new_eos());
    gst_element_send_event(data.alsasrc, gst_event_new_eos());
    
    recording = 0;

    
    return 0;
}

//######################################################################
//#
//# FIFO
//#
//######################################################################

int start_counter = 0;
int stop_counter = 0;
#define BUFLEN 128
char buf[BUFLEN];
char* TARGET_START = "start";
char* TARGET_STOP  = "stop";
//The fifo filehandle we are told to start/stop through
//"start" or "stop" is sent
char *fifo_name = "gst.fifo";

pthread_t fifo_thread_id; 

void* pollfifo(void* p) {
    //sleep(5);
    printf("Listen to fifo\r\n");
    //Read from fifo untill empty
    while(1)
    {
        int fifo = open(fifo_name, O_RDONLY);
        char buffer[128];
        int n = read(fifo, buffer, 128);
        if (n > 0) {
            for(int i = 0; i < n; i++)
            {
                if (buffer[i] == TARGET_START[start_counter]) {
                    start_counter++;
                    if (start_counter >= strlen(TARGET_START)) {
                        start_counter = 0;
                        start();
                    }
                } else {
                    start_counter = 0;
                }
                
                if (buffer[i] == TARGET_STOP[stop_counter]) {
                    stop_counter++;
                    if (stop_counter >= strlen(TARGET_STOP)) {
                        stop_counter = 0;
                        stop();
                    }
                } else {
                    stop_counter = 0;
                }
            }
        }
        close(fifo);
    }
}

//######################################################################
//#
//# Main
//#
//######################################################################

int main(int argc, char *argv[]) 
{
    memset(&data, 0, sizeof(CustomData));
    gst_init(&argc, &argv);
    guint major;
    guint minor;
    guint micro;
    guint nano;
    gst_version(&major, &minor, &micro, &nano);
    printf("Recorder\r\n");
    printf("Gstreamer version %d %d %d %d\r\n", major, minor, micro, nano);
    mkfifo(fifo_name, 0666);
    data.loop = g_main_loop_new(NULL, FALSE);    
    initialize();
    pthread_create(&fifo_thread_id, NULL, pollfifo, NULL); 
    g_main_loop_run(data.loop);
    return 0;
}


_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
Reply | Threaded
Open this post in threaded view
|

Re: Branching pipeline burning memory when restarting

Geek-Gst
I roughly remember.
Bug in mp4mux. Mp4mux keeps on accumulating on stream index on every new
segment and freed only on EOS.

identity single-segment=true after decodebin on both video and audio branch
will eat segment. This work around may help you.  



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
Reply | Threaded
Open this post in threaded view
|

Re: Branching pipeline burning memory when restarting

Jesper Taxbøl
I am not sure that is the issue, as I see the memory being burned on restarting the pipeline.

But I will give it a try anyway.

Kind regards

Jesper

Den lør. 26. jan. 2019 kl. 14.12 skrev Geek-Gst <[hidden email]>:
I roughly remember.
Bug in mp4mux. Mp4mux keeps on accumulating on stream index on every new
segment and freed only on EOS.

identity single-segment=true after decodebin on both video and audio branch
will eat segment. This work around may help you. 



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel


--
Jesper Taxbøl
+45 61627501


_______________________________________________
gstreamer-devel mailing list
[hidden email]
https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel