Hi guys,
I am a beginner to GStreamer. Recently i'm trying to understand the mechanisme of multithreads in GStreamer. Say that i have several independant video sources, i want to realize the following functions: 1. Visualise these video sources in paralle. 2. If necessary, pause one of the sources and let the others continue the display. Then based on this principe I've written these following codes: ``` static gboolean push_data(GstSession *p_gstSession){ static gboolean white = FALSE; GstBuffer *buffer; size_t size = 0; GstFlowReturn ret = GST_FLOW_OK; GstMapInfo info; //Creation of a random perturbation to check out the independance of each thread /* int v1 = rand() % 10000000; g_print (" %i\n", v1); if (v1 > 9999998) { g_print("SLEEP !"); g_usleep(20000000); } */ CVideoData *videoData = (CVideoData*) p_gstSession->inputFifo->Pop(); if (videoData && videoData !=NULL ) { size = videoData->GetSize(); buffer = gst_buffer_new_allocate(NULL, size, NULL); gst_buffer_map(buffer, &info, GST_MAP_WRITE); GST_BUFFER_PTS (buffer) = p_gstSession->timestamp; // Ne semble pas avoir d'impact, à cause de la soure live ? GST_BUFFER_DURATION (buffer) = gst_util_uint64_scale_int (1, GST_SECOND, 90000); p_gstSession->timestamp += GST_BUFFER_DURATION (buffer); memcpy(info.data, videoData->GetData(), size); gst_buffer_unmap(buffer, &info); g_signal_emit_by_name (p_gstSession->source, "push-buffer", buffer, &ret); p_gstSession->num_packet++; gst_buffer_unref(buffer); } if (ret != GST_FLOW_OK) { /* something wrong, stop pushing */ g_main_loop_quit (p_gstSession->main_loop); } return TRUE; } static void start_feed (GstElement * pipeline, guint size, GstSession *gstSession) { if (gstSession->sourceid == 0) { g_print ("start feeding at packet %i\n", gstSession->num_packet); gstSession->sourceid = g_idle_add ((GSourceFunc) push_data, gstSession); } } /* This callback is called when appsrc has enough data and we can stop sending. * We remove the idle handler from the mainloop */ static void stop_feed (GstElement * pipeline,GstSession *gstSession) { if (gstSession->sourceid != 0) { g_print ("stop feeding at packet %i\n", gstSession->num_packet); g_source_remove (gstSession->sourceid); gstSession->sourceid = 0; } } static gboolean on_pipeline_message (GstBus * bus, GstMessage * message, GstSession *gstSession) { GstState state, pending; switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_EOS: g_print ("Received End of Stream message\n"); g_main_loop_quit (gstSession->main_loop); break; case GST_MESSAGE_ERROR: { g_print ("Received error\n"); GError *err = NULL; gchar *dbg_info = NULL; gst_message_parse_error (message, &err, &dbg_info); g_printerr ("ERROR from element %s: %s\n", GST_OBJECT_NAME (message->src), err->message); g_printerr ("Debugging info: %s\n", (dbg_info) ? dbg_info : "none"); g_error_free (err); g_free (dbg_info); } g_main_loop_quit (gstSession->main_loop); break; case GST_MESSAGE_STATE_CHANGED: gst_element_get_state(gstSession->source, &state, &pending, GST_CLOCK_TIME_NONE); /* g_print ("State changed from %i to %i\n", state, pending); */ break; default: break; } return TRUE; } int CSimpleUdpDisplayGstPipeline::start() { int argc = 1; char *dummy_args[] = {NULL}; GstBus *bus = NULL; char **argv = dummy_args; gst_init(&argc, &argv); gstSession.pipeline = gst_pipeline_new("pipeline"); // if (enable_appsrc) { gstSession.source = gst_element_factory_make("appsrc", "appsrc"); bus = gst_element_get_bus (gstSession.pipeline); gst_bus_add_watch (bus, (GstBusFunc) on_pipeline_message, &gstSession); gst_object_unref (bus); /* configure the appsrc, we will push data into the appsrc from the * mainloop */ g_signal_connect (gstSession.source, "need-data", G_CALLBACK (start_feed), &gstSession); g_signal_connect (gstSession.source, "enough-data", G_CALLBACK (stop_feed), &gstSession); g_object_set (gstSession.source, "is-live", true, NULL); g_object_set (gstSession.source, "do-timestamp", false, NULL); g_object_set (gstSession.source, "format", GST_FORMAT_TIME, NULL); gstSession.jitterbuffer = gst_element_factory_make("rtpjitterbuffer", "rtpjitterbuffer"); if (video_format == "H264") { gstSession.payload_depay = gst_element_factory_make("rtph264depay", "depay"); gstSession.decoder = gst_element_factory_make("avdec_h264", "decoder"); } else if (video_format=="MP4V-ES"){ gstSession.payload_depay = gst_element_factory_make("rtpmp4vdepay", "depay"); gstSession.decoder = gst_element_factory_make("avdec_mpeg4", "decoder"); } gstSession.videoconvert = gst_element_factory_make("videoconvert", "videoconvert"); gstSession.videoscale = gst_element_factory_make("videoscale", "videoscale"); gstSession.display = gst_element_factory_make("xvimagesink", "ximagesink"); gstSession.queue = gst_element_factory_make("queue", "queue"); gstSession.timeoverlay = gst_element_factory_make("timeoverlay", "timeoverlay"); gstSession.capsfilter = gst_element_factory_make("capsfilter", "capsfilter"); GstCaps *caps = gst_caps_new_simple("application/x-rtp", "media", G_TYPE_STRING, "video", "clock-rate", G_TYPE_INT, clock_rate, "encoding-name", G_TYPE_STRING, video_format.c_str(), "profile-level-id", G_TYPE_STRING, pli.c_str(), "sprop-parameter-sets", G_TYPE_STRING, sps.c_str(), NULL); if (!gstSession.source || !gstSession.payload_depay || !gstSession.jitterbuffer || !gstSession.capsfilter || !gstSession.decoder || !gstSession.videoconvert || !gstSession.videoscale || !gstSession.display || !gstSession.timeoverlay) { g_printerr("One gst element could not be created.\n"); return 1; } g_object_set(G_OBJECT(gstSession.capsfilter), "caps", caps, NULL); g_object_set(G_OBJECT(gstSession.jitterbuffer), "mode", 0, NULL); gst_bin_add_many(GST_BIN(gstSession.pipeline), gstSession.source, gstSession.capsfilter, gstSession.jitterbuffer, gstSession.payload_depay, gstSession.decoder, gstSession.timeoverlay, gstSession.videoconvert, gstSession.videoscale, gstSession.display, gstSession.queue, NULL); if (!gst_element_link_many(gstSession.source, gstSession.capsfilter, gstSession.jitterbuffer, gstSession.payload_depay, gstSession.decoder, gstSession.timeoverlay, gstSession.videoconvert, gstSession.videoscale, gstSession.display, NULL)) { g_printerr("Erreur lors du linkage du pipeline principale (flux camera)\n"); }; gstSession.main_loop = g_main_loop_new(NULL, FALSE); gst_element_set_state(gstSession.pipeline, GST_STATE_PLAYING); g_print("Lancement de GstreamerVisualiser... \n"); g_main_loop_run(gstSession.main_loop); gst_element_set_state(gstSession.pipeline, GST_STATE_NULL); gst_object_unref(GST_OBJECT(gstSession.pipeline)); return 0; } ``` So basically i wanted to create for each video source an independante pipeline to make them on display mode in paralle, So what I've done was to call the start() methode several times in my main function to create several threads. Finally I've successfully got the displays for all video sources. But when I uncomment the random perturbation function in the methode push_data(), all the sources seemed to be influenced and jumpe in to the sleep mode, which mean that all displays are frozen. So is there any way to stopping one video source injecting while let the other sources continue to push data? Do i need to write for each pipeline an independant push_data()mehtode to avoid the global perturbation? -- Sent from: http://gstreamer-devel.966125.n4.nabble.com/ _______________________________________________ gstreamer-devel mailing list [hidden email] https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel |
Free forum by Nabble | Edit this page |