Hi Guys, I am working on an application that is supposed to record video from a RTSP camera alongside audio from a local ALSA source into a MP4 container. I know the MP4 container need an EOS to finalize properly so I send EOS signal through the pipeline (both alsasrc and rtspsrc) and await an EOS recieved event before setting state GST_STATE_NULL. My code is prepended to this mail and inspired from https://gstreamer.freedesktop.org/documentation/tutorials/basic/dynamic-pipelines.html I interact with the program by sending "start" or "stop" to the gst.fifo file handle. This works fine and I get nice MP4 files, except that when I want to record a new file I see an ever increased memory consumption giving my application a very limited lifespan. I assume its because audio and video is not in sync and some buffering occur. Pointers on how I should deal with this problem are very welcome. kind regards Jesper --------------------------- #include <gst/gst.h> #include <gst/base/gstbaseparse.h> #include <stdio.h> #include <fcntl.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <dirent.h> #include <sqlite3.h> #include <sys/statvfs.h> #include <sys/stat.h> #include <sys/types.h> #include <pthread.h> #define RTSP_CAMERA_L "rtsp://192.168.131.176:1001/" #define RTSP_CAMERA_R "rtsp://192.168.131.176:1002/" #define BASE_PATH "" static int current_recording_starttime = 0; int recording = 0; //###################################################################### //# //# Datastructure keeping references to pipeline elements //# //###################################################################### typedef struct _CustomData { GstElement *pipeline; GstElement *rtspsrc_l; GstElement *rtph264depay_l; GstElement *h264parse_l; GstElement *mp4mux_l; GstElement *filesink_l; GstElement *alsasrc; GstElement *audioconvert; GstElement *faac; GstElement *audio_tee; GstElement *queue_l; GMainLoop *loop; GstBus *bus; } CustomData; CustomData data; //###################################################################### //# //# Bus callback //# //###################################################################### int bus_callback(GstBus *bus, GstMessage *msg, gpointer d) { GError *err; gchar *debug_info; switch (GST_MESSAGE_TYPE(msg)) { case GST_MESSAGE_ERROR: gst_message_parse_error(msg, &err, &debug_info); g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none"); g_clear_error(&err); g_free(debug_info); //g_main_loop_quit(data.loop); break; case GST_MESSAGE_EOS: g_print("End-Of-Stream reached.\n"); if(gst_element_set_state(data.pipeline, GST_STATE_NULL) == GST_STATE_CHANGE_FAILURE) printf("ERROR A\r\n"); break; case GST_MESSAGE_STATE_CHANGED: // We are only interested in state-changed messages from the pipeline if (GST_MESSAGE_SRC(msg) == GST_OBJECT(data.pipeline)) { GstState old_state, new_state, pending_state; gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state); g_print("Pipeline state changed from %s to %s:\n", gst_element_state_get_name(old_state), gst_element_state_get_name(new_state)); if(new_state == GST_STATE_PAUSED && old_state == GST_STATE_PLAYING) { //printf("AAAAAAAAAAAAAAAAAAAA\r\n"); } } else { GstState old_state, new_state, pending_state; gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state); //g_print("Element %s state changed from %s to %s:\n", GST_OBJECT_NAME(GST_MESSAGE_SRC(msg)), // gst_element_state_get_name(old_state), gst_element_state_get_name(new_state)); } break; case GST_MESSAGE_PROGRESS: { //g_print("Progress notify\r\n"); GstProgressType type; gchar *code, *text; gst_message_parse_progress (msg, &type, &code, &text); switch (type) { case GST_PROGRESS_TYPE_START: //printf("GST_PROGRESS_TYPE_START\r\n"); break; case GST_PROGRESS_TYPE_CONTINUE: //printf("GST_PROGRESS_TYPE_CONTINUE\r\n"); break; case GST_PROGRESS_TYPE_COMPLETE: //printf("PROGRESS_TYPE_COMPLETE\r\n"); break; case GST_PROGRESS_TYPE_CANCELED: printf("GST_PROGRESS_TYPE_CANCELED\r\n"); break; case GST_PROGRESS_TYPE_ERROR: printf("GST_PROGRESS_TYPE_ERROR\r\n"); break; default: printf("DEFAULT\r\n"); break; } //printf("Progress: (%s) %s\n", code, text); g_free (code); g_free (text); } break; case GST_MESSAGE_NEW_CLOCK: { GstClock *clock; gst_message_parse_new_clock (msg, &clock); printf("New clock: %s\n", (clock ? GST_OBJECT_NAME (clock) : "NULL")); } break; case GST_MESSAGE_STREAM_START: { guint group_id; gst_message_parse_group_id (msg, &group_id); printf("Stream start group id: %d\n", group_id); } break; case GST_MESSAGE_ASYNC_DONE: { GstClockTime t; gst_message_parse_async_done(msg, &t); printf("Async done: %u ns\n", t); } break; case GST_MESSAGE_STREAM_STATUS: { //GstClockTime t; //gst_message_parse_stream_status(msg, &t); //printf("Stream status\n"); } break; case GST_MESSAGE_LATENCY: { //GstClockTime t; //gst_message_parse_stream_status(msg, &t); printf("Latency\n"); } break; default: // We should not reach here g_printerr("Unexpected message received. Type: %s\n", GST_MESSAGE_TYPE_NAME(msg)); break; } return TRUE; } //###################################################################### //# //# Callback for rtsp sources to connect to pipeline //# //###################################################################### static void pad_added_handler(GstElement *src, GstPad *new_pad, CustomData* data) { GstPad *sink_pad; sink_pad = gst_element_get_static_pad(data->rtph264depay_l, "sink"); if (gst_pad_is_linked (sink_pad)) { g_print ("We are already linked. Ignoring.\n"); goto exit; } //g_print("Received new pad '%s' from '%s':\n", GST_PAD_NAME(new_pad), GST_ELEMENT_NAME(src)); GstCaps *new_pad_caps = gst_pad_get_current_caps(new_pad); GstStructure *new_pad_struct = gst_caps_get_structure(new_pad_caps, 0); const gchar *new_pad_type = gst_structure_get_name(new_pad_struct); if (g_str_has_prefix(new_pad_type, "application/x-rtp")) { GstPadLinkReturn ret = gst_pad_link (new_pad, sink_pad); if (GST_PAD_LINK_FAILED(ret)) { g_print("Type is '%s' but link failed.\n", new_pad_type); } else { g_print("Link succeeded (type '%s') from %s to %s.\n", new_pad_type, GST_PAD_NAME(new_pad), GST_PAD_NAME(sink_pad)); } return; } exit: //Unreference the new pad's caps, if we got them if (new_pad_caps != NULL) { gst_caps_unref(new_pad_caps); } //Unreference the sink pad gst_object_unref(sink_pad); } //###################################################################### //# //# GST_Init //# //###################################################################### int initialize() { //Create base pipeline char buffer[100]; data.pipeline = gst_pipeline_new("veo-pipeline"); if (data.pipeline == NULL) { printf("Pipeline could not be created."); return -1; } //Create all elements - RIGHT data.rtspsrc_l = gst_element_factory_make("rtspsrc", "rtspsrc_l"); data.rtph264depay_l = gst_element_factory_make("rtph264depay", "rtph264depay_l"); data.h264parse_l = gst_element_factory_make("h264parse", "h264parse_l"); data.mp4mux_l = gst_element_factory_make("qtmux", "mp4mux_l"); data.filesink_l = gst_element_factory_make("filesink", "filesink_l"); data.alsasrc = gst_element_factory_make("alsasrc", "alsasrc"); data.audioconvert = gst_element_factory_make("audioconvert", "audioconvert"); data.faac = gst_element_factory_make("faac", "faac"); data.audio_tee = gst_element_factory_make("tee", "audio_tee"); data.queue_l = gst_element_factory_make("queue", "queue_l"); if ( !data.rtspsrc_l || !data.rtph264depay_l || !data.h264parse_l || !data.mp4mux_l || !data.filesink_l) { g_printerr("Not all RIGHT elements could be created.\n"); return -1; } if ( !data.alsasrc || !data.audioconvert || !data.faac || !data.audio_tee || !data.queue_l) { g_printerr("Not all AUDIO elements could be created.\n"); return -1; } gst_bin_add_many(GST_BIN(data.pipeline), data.rtspsrc_l, data.rtph264depay_l, data.h264parse_l, data.mp4mux_l, data.filesink_l, NULL); gst_bin_add_many(GST_BIN(data.pipeline), data.alsasrc, data.audioconvert, data.faac, data.audio_tee, data.queue_l, NULL); //Set parameters gst_base_parse_set_pts_interpolation ((GstBaseParse *)(data.h264parse_l),TRUE); g_object_set(data.alsasrc, "buffer-time", 5 * 1000000, NULL);//5 second buffer sprintf(buffer, "%ssmall", RTSP_CAMERA_L); g_object_set(data.rtspsrc_l, "location", buffer, "ntp-sync", TRUE, "protocols", (1 << 2), //GST_RTSP_LOWER_TRANS_TCP "do-rtsp-keep-alive", FALSE, "debug", TRUE, NULL); // Listen to the bus data.bus = gst_element_get_bus(data.pipeline); gst_bus_add_watch(data.bus, bus_callback, &data); //Setup callback to connect rtsp pads to other modules g_signal_connect(data.rtspsrc_l, "pad-added", G_CALLBACK(pad_added_handler), &(data)); gst_element_link(data.rtph264depay_l, data.h264parse_l); gst_element_link(data.h264parse_l, data.mp4mux_l); gst_element_link(data.mp4mux_l, data.filesink_l); gst_element_link(data.alsasrc, data.audioconvert); gst_element_link(data.audioconvert, data.faac); gst_element_link(data.faac, data.audio_tee); gst_element_link(data.audio_tee, data.queue_l); gst_element_link(data.queue_l, data.mp4mux_l); } //###################################################################### //# //# Start recording //# //###################################################################### int start() { char buffer[100]; printf("#######################################################\r\n"); printf("Start\r\n"); printf("#######################################################\r\n"); if(recording != 0) { printf("Cant start recording when already recording\r\n"); return -1; } current_recording_starttime ++; //Set filesink location names sprintf(buffer, "%s%d_l.mp4", BASE_PATH, current_recording_starttime); g_object_set(data.filesink_l, "location", buffer, NULL); //Start pipeline GstPadLinkReturn ret; ret = gst_element_set_state(data.pipeline, GST_STATE_PLAYING); if (ret == GST_STATE_CHANGE_FAILURE) { g_printerr("Cant set ready.\n"); gst_object_unref(data.pipeline); return -1; } recording = 1; return 0; } //###################################################################### //# //# Stop recording //# //###################################################################### int stop() { printf("#######################################################\r\n"); printf("Stop\r\n"); printf("#######################################################\r\n"); if (recording == 0) { printf("Pipeline must be recording for us to stop it\r\n"); return -1; } gst_element_send_event(data.rtspsrc_l, gst_event_new_eos()); gst_element_send_event(data.alsasrc, gst_event_new_eos()); recording = 0; return 0; } //###################################################################### //# //# FIFO //# //###################################################################### int start_counter = 0; int stop_counter = 0; #define BUFLEN 128 char buf[BUFLEN]; char* TARGET_START = "start"; char* TARGET_STOP = "stop"; //The fifo filehandle we are told to start/stop through //"start" or "stop" is sent char *fifo_name = "gst.fifo"; pthread_t fifo_thread_id; void* pollfifo(void* p) { //sleep(5); printf("Listen to fifo\r\n"); //Read from fifo untill empty while(1) { int fifo = open(fifo_name, O_RDONLY); char buffer[128]; int n = read(fifo, buffer, 128); if (n > 0) { for(int i = 0; i < n; i++) { if (buffer[i] == TARGET_START[start_counter]) { start_counter++; if (start_counter >= strlen(TARGET_START)) { start_counter = 0; start(); } } else { start_counter = 0; } if (buffer[i] == TARGET_STOP[stop_counter]) { stop_counter++; if (stop_counter >= strlen(TARGET_STOP)) { stop_counter = 0; stop(); } } else { stop_counter = 0; } } } close(fifo); } } //###################################################################### //# //# Main //# //###################################################################### int main(int argc, char *argv[]) { memset(&data, 0, sizeof(CustomData)); gst_init(&argc, &argv); guint major; guint minor; guint micro; guint nano; gst_version(&major, &minor, µ, &nano); printf("Recorder\r\n"); printf("Gstreamer version %d %d %d %d\r\n", major, minor, micro, nano); mkfifo(fifo_name, 0666); data.loop = g_main_loop_new(NULL, FALSE); initialize(); pthread_create(&fifo_thread_id, NULL, pollfifo, NULL); g_main_loop_run(data.loop); return 0; } _______________________________________________ gstreamer-devel mailing list [hidden email] https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
I roughly remember.
Bug in mp4mux. Mp4mux keeps on accumulating on stream index on every new segment and freed only on EOS. identity single-segment=true after decodebin on both video and audio branch will eat segment. This work around may help you. -- Sent from: http://gstreamer-devel.966125.n4.nabble.com/ _______________________________________________ gstreamer-devel mailing list [hidden email] https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
I am not sure that is the issue, as I see the memory being burned on restarting the pipeline. But I will give it a try anyway. Kind regards Jesper Den lør. 26. jan. 2019 kl. 14.12 skrev Geek-Gst <[hidden email]>: I roughly remember. Jesper Taxbøl
+45 61627501 _______________________________________________ gstreamer-devel mailing list [hidden email] https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
Free forum by Nabble | Edit this page |