#include #include #include #include #include #include #include #include #include #include #include #include #include // Add this for PATH_MAX #include "nvbufsurface.h" #include "gstnvdsmeta.h" #include "nvds_analytics_meta.h" #include "nvds_obj_encode.h" #include "nvds_yml_parser.h" #include "gst-nvmessage.h" #include "nvdsmeta_schema.h" // Added for event message metadata #include #include // for mkdir #include #include #include #include "ocr_triton_client.h" static OCRTritonClient *g_ocr_client = nullptr; namespace fs = std::experimental::filesystem; // Define constants - avoid conflicts with existing macros #ifndef MAX_DISPLAY_LEN #define MAX_DISPLAY_LEN 64 #endif #ifndef MAX_TIME_STAMP_LEN #define MAX_TIME_STAMP_LEN 32 #endif #ifndef MAX_SENSOR_STR_LEN #define MAX_SENSOR_STR_LEN 64 #endif #ifndef MAX_LABEL_SIZE #define MAX_LABEL_SIZE 128 #endif const char *TRACKER_CONFIG_FILE = "configs_seg/tracker_config.txt"; #define PGIE_CLASS_ID_VEHICLE 3 #define PGIE_CLASS_ID_PERSON 0 #define OSD_PROCESS_MODE 1 #define OSD_DISPLAY_TEXT 1 #define MUXER_OUTPUT_WIDTH 1280 #define MUXER_OUTPUT_HEIGHT 720 #define MUXER_BATCH_TIMEOUT_USEC 4000000 #define TILED_OUTPUT_WIDTH 1280 #define TILED_OUTPUT_HEIGHT 720 #define save_img TRUE #define attach_user_meta TRUE #define GST_CAPS_FEATURES_NVMM "memory:NVMM" #ifndef FILE_NAME_SIZE #define FILE_NAME_SIZE 256 #endif gint frame_number = 0, frame_count = 0; std::vector read_class_label(const std::string &filename) { std::vector labels; std::ifstream infile(filename); std::string line; while (std::getline(infile, line)) { if (!line.empty()) { labels.push_back(line); } } return labels; } bool ensure_directory(const std::string &dir) { try { if (!fs::exists(dir)) { if (fs::create_directories(dir)) { std::cout << "Created directory: " << dir << std::endl; } else { std::cerr << "Failed to created directory: " << dir << std::endl; return false; } } return true; } catch (const fs::filesystem_error &e) { std::cerr << "Fileystem Error : " << e.what() << std::endl; return false; } } /* Function to generate RFC3339 timestamp */ static void generate_ts_rfc3339(char *buf, int buf_size) { time_t tloc; struct tm tm_log; struct timespec ts; char strmsec[6]; clock_gettime(CLOCK_REALTIME, &ts); memcpy(&tloc, (void *)(&ts.tv_sec), sizeof(time_t)); gmtime_r(&tloc, &tm_log); strftime(buf, buf_size, "%Y-%m-%dT%H:%M:%S", &tm_log); int ms = ts.tv_nsec / 1000000; g_snprintf(strmsec, sizeof(strmsec), ".%.3dZ", ms); strncat(buf, strmsec, buf_size); } /* Meta data copy function set by user */ static gpointer meta_copy_func(gpointer data, gpointer user_data) { NvDsUserMeta *user_meta = (NvDsUserMeta *)data; NvDsEventMsgMeta *srcMeta = (NvDsEventMsgMeta *)user_meta->user_meta_data; NvDsEventMsgMeta *dstMeta = NULL; // Use g_memdup2 instead of deprecated g_memdup dstMeta = (NvDsEventMsgMeta *)g_memdup(srcMeta, sizeof(NvDsEventMsgMeta)); if (srcMeta->ts) dstMeta->ts = g_strdup(srcMeta->ts); if (srcMeta->sensorStr) dstMeta->sensorStr = g_strdup(srcMeta->sensorStr); if (srcMeta->objectId) dstMeta->objectId = g_strdup(srcMeta->objectId); return dstMeta; } /* Meta data release function set by user */ static void meta_free_func(gpointer data, gpointer user_data) { NvDsUserMeta *user_meta = (NvDsUserMeta *)data; NvDsEventMsgMeta *srcMeta = (NvDsEventMsgMeta *)user_meta->user_meta_data; if (srcMeta->ts) { g_free(srcMeta->ts); } if (srcMeta->sensorStr) { g_free(srcMeta->sensorStr); } if (srcMeta->objectId) { g_free(srcMeta->objectId); } g_free(srcMeta); } namespace { std::string pgie_classes_str[6] = {"Person", "bicycle", "car", "motorcycle", "airplane", "bus"}; std::vector class_labels = read_class_label("configs_seg/primary_detection_seg/gloves_seg_v1/labels.txt"); bool PERF_MODE = false; GstElement *nvvidconv1 = nullptr; GstElement *filter1 = nullptr; GstElement *nvvidconv2 = nullptr; GstElement *filter2 = nullptr; GstElement *nvv4l2h264enc = nullptr; GstElement *parse = nullptr; GstElement *rtppay = nullptr; guint udp_port = 5400; GstRTSPServer *server = nullptr; } void process_encoded_buffer(uint8_t *buffer, uint32_t length, const char *class_name) { std::cout << "🔄 Processing encoded buffer for " << class_name << " (size: " << length << " bytes)" << std::endl; // Here you can: // 1. Send buffer over network // 2. Store in database // 3. Analyze the image // 4. Apply additional processing // etc. } bool safe_process_ocr(const uint8_t* buffer, size_t buffer_size, const char* class_name, guint64 object_id, gint frame_num) { if (!g_ocr_client || !buffer || buffer_size == 0) { std::cerr << "❌ Invalid OCR parameters" << std::endl; return false; } try { std::cout << "🔍 Processing " << class_name << " crop (ID: " << object_id << ", Frame: " << frame_num << ", Size: " << buffer_size << " bytes)" << std::endl; // Create a copy of the buffer to avoid any memory issues std::vector buffer_copy(buffer, buffer + buffer_size); auto [detected_texts, inference_time] = g_ocr_client->getOCRTextFromBuffer( buffer_copy.data(), buffer_copy.size() ); // Print OCR results std::cout << "📝 OCR Results:" << std::endl; std::cout << " Class: " << class_name << std::endl; std::cout << " Object ID: " << object_id << std::endl; std::cout << " Frame: " << frame_num << std::endl; std::cout << " Inference time: " << inference_time << " ms" << std::endl; if (detected_texts.empty()) { std::cout << " ❌ No text detected" << std::endl; } else { std::cout << " ✅ Detected " << detected_texts.size() << " text element(s):" << std::endl; for (size_t i = 0; i < detected_texts.size(); ++i) { std::cout << " [" << (i + 1) << "] \"" << detected_texts[i] << "\"" << std::endl; } } std::cout << std::string(60, '=') << std::endl; return true; } catch (const std::exception& e) { std::cerr << "❌ OCR processing failed for " << class_name << ": " << e.what() << std::endl; return false; } catch (...) { std::cerr << "❌ Unknown OCR processing error for " << class_name << std::endl; return false; } } static GstPadProbeReturn nvdsanalytics_src_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer u_data) { GstBuffer *buf = (GstBuffer *)info->data; GstMapInfo inmap = GST_MAP_INFO_INIT; NvDsObjEncCtxHandle enc_ctx = static_cast(u_data); const char *sensor_id = "sensor_0"; if (!gst_buffer_map(buf, &inmap, GST_MAP_READ)) { GST_ERROR("Failed to map GstBuffer."); return GST_PAD_PROBE_OK; } NvBufSurface *ip_surf = (NvBufSurface *)inmap.data; gst_buffer_unmap(buf, &inmap); NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf); if (!batch_meta) return GST_PAD_PROBE_OK; // =========================================== // PASS 1: DETECT AND ENCODE // =========================================== std::vector line_crossing_objects; for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next) { NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)l_frame->data; int source_id = frame_meta->source_id; for (NvDsMetaList *l_obj = frame_meta->obj_meta_list; l_obj != NULL; l_obj = l_obj->next) { NvDsObjectMeta *obj_meta = (NvDsObjectMeta *)l_obj->data; bool crossed_line = false; // Check for line crossing for (NvDsMetaList *l_user = obj_meta->obj_user_meta_list; l_user != NULL; l_user = l_user->next) { NvDsUserMeta *user_meta = (NvDsUserMeta *)l_user->data; if (user_meta->base_meta.meta_type == nvds_get_user_meta_type((gchar *)"NVIDIA.DSANALYTICSOBJ.USER_META")) { NvDsAnalyticsObjInfo *analytics = (NvDsAnalyticsObjInfo *)user_meta->user_meta_data; if (!analytics->lcStatus.empty()) { crossed_line = true; line_crossing_objects.push_back(obj_meta->object_id); break; } } } if (crossed_line) { NvDsEventMsgMeta *msg_meta = (NvDsEventMsgMeta *)g_malloc0(sizeof(NvDsEventMsgMeta)); msg_meta->type = NVDS_EVENT_CUSTOM; msg_meta->objType = NVDS_OBJECT_TYPE_CUSTOM; msg_meta->bbox.left = obj_meta->rect_params.left; msg_meta->bbox.top = obj_meta->rect_params.top; msg_meta->bbox.width = obj_meta->rect_params.width; msg_meta->bbox.height = obj_meta->rect_params.height; msg_meta->frameId = frame_number; msg_meta->trackingId = obj_meta->object_id; msg_meta->confidence = obj_meta->confidence; // Allocate and set timestamp msg_meta->ts = (gchar *)g_malloc0(MAX_TIME_STAMP_LEN + 1); generate_ts_rfc3339(msg_meta->ts, MAX_TIME_STAMP_LEN); // Allocate and set sensor string msg_meta->sensorStr = (gchar *)g_malloc0(MAX_SENSOR_STR_LEN); g_strlcpy(msg_meta->sensorStr, sensor_id, MAX_SENSOR_STR_LEN); // Allocate and set object ID msg_meta->objectId = (gchar *)g_malloc0(MAX_LABEL_SIZE); // Set video path msg_meta->videoPath = g_strdup_printf("%d", source_id); // Get class name const char *class_name = (obj_meta->class_id < class_labels.size()) ? class_labels[obj_meta->class_id].c_str() : pgie_classes_str[obj_meta->class_id].c_str(); g_strlcpy(msg_meta->objectId, class_name, MAX_LABEL_SIZE); std::cout << "📍 Line crossing detected for " << class_name << " (ID: " << obj_meta->object_id << ")" << std::endl; // Create user meta for the event message NvDsUserMeta *user_event_meta = nvds_acquire_user_meta_from_pool(batch_meta); if (user_event_meta) { user_event_meta->user_meta_data = (void *)msg_meta; user_event_meta->base_meta.meta_type = NVDS_EVENT_MSG_META; user_event_meta->base_meta.copy_func = (NvDsMetaCopyFunc)meta_copy_func; user_event_meta->base_meta.release_func = (NvDsMetaReleaseFunc)meta_free_func; // Attach to frame meta nvds_add_user_meta_to_frame(frame_meta, user_event_meta); std::cout << "🚀 Sent metadata to Kafka: " << class_name << " (conf: " << obj_meta->confidence << ")" << std::endl; } else { g_print("Error in attaching event meta to buffer\n"); // Clean up allocated memory on failure if (msg_meta->ts) g_free(msg_meta->ts); if (msg_meta->sensorStr) g_free(msg_meta->sensorStr); if (msg_meta->objectId) g_free(msg_meta->objectId); if (msg_meta->videoPath) g_free(msg_meta->videoPath); g_free(msg_meta); } // Prepare encoding parameters NvDsObjEncUsrArgs objData = {0}; objData.saveImg = TRUE; objData.attachUsrMeta = TRUE; objData.scaleImg = FALSE; objData.scaledWidth = 0; objData.scaledHeight = 0; objData.objNum = 1; objData.quality = 90; nvds_obj_enc_process(enc_ctx, &objData, ip_surf, obj_meta, frame_meta); } } } nvds_obj_enc_finish(enc_ctx); // =========================================== // PASS 2: PROCESS OCR (SIMPLIFIED) // =========================================== for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next) { NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)l_frame->data; int source_id = frame_meta->source_id; for (NvDsMetaList *l_obj = frame_meta->obj_meta_list; l_obj != NULL; l_obj = l_obj->next) { NvDsObjectMeta *obj_meta = (NvDsObjectMeta *)l_obj->data; // Check if this object had line crossing bool should_process = false; for (guint64 id : line_crossing_objects) { if (id == obj_meta->object_id) { should_process = true; break; } } if (!should_process) continue; // Look for crop metadata for (NvDsMetaList *l_user = obj_meta->obj_user_meta_list; l_user != NULL; l_user = l_user->next) { NvDsUserMeta *user_meta = (NvDsUserMeta *)l_user->data; if (user_meta->base_meta.meta_type == NVDS_CROP_IMAGE_META) { NvDsObjEncOutParams *enc_jpeg_image = (NvDsObjEncOutParams *)user_meta->user_meta_data; if (enc_jpeg_image && enc_jpeg_image->outBuffer && enc_jpeg_image->outLen > 0) { const char *class_name = (obj_meta->class_id < class_labels.size()) ? class_labels[obj_meta->class_id].c_str() : "object"; // Process OCR safely // safe_process_ocr( // reinterpret_cast(enc_jpeg_image->outBuffer), // enc_jpeg_image->outLen, // class_name, // obj_meta->object_id, // frame_number // ); } break; } } } } frame_number++; return GST_PAD_PROBE_OK; } static gboolean bus_call(GstBus *bus, GstMessage *msg, gpointer data) { GMainLoop *loop = static_cast(data); switch (GST_MESSAGE_TYPE(msg)) { case GST_MESSAGE_EOS: std::cout << "End of stream" << std::endl; g_main_loop_quit(loop); break; case GST_MESSAGE_WARNING: { gchar *debug; GError *error; gst_message_parse_warning(msg, &error, &debug); std::cerr << "WARNING from element " << GST_OBJECT_NAME(msg->src) << ": " << error->message << std::endl; g_free(debug); std::cerr << "Warning: " << error->message << std::endl; g_error_free(error); break; } case GST_MESSAGE_ERROR: { gchar *debug; GError *error; gst_message_parse_error(msg, &error, &debug); std::cerr << "ERROR from element " << GST_OBJECT_NAME(msg->src) << ": " << error->message << std::endl; if (debug) std::cerr << "Error details: " << debug << std::endl; g_free(debug); g_error_free(error); g_main_loop_quit(loop); break; } case GST_MESSAGE_ELEMENT: { if (gst_nvmessage_is_stream_eos(msg)) { guint stream_id; if (gst_nvmessage_parse_stream_eos(msg, &stream_id)) { std::cout << "Got EOS from stream " << stream_id << std::endl; } } break; } default: break; } return TRUE; } #define CHECK_ERROR(error) \ if (error) \ { \ g_printerr("Error while parsing config file: %s\n", error->message); \ goto done; \ } const char *CONFIG_GROUP_TRACKER = "tracker"; const char *CONFIG_GROUP_TRACKER_WIDTH = "tracker-width"; const char *CONFIG_GROUP_TRACKER_HEIGHT = "tracker-height"; const char *CONFIG_GROUP_TRACKER_LL_CONFIG_FILE = "ll-config-file"; const char *CONFIG_GROUP_TRACKER_LL_LIB_FILE = "ll-lib-file"; const char *CONFIG_GPU_ID = "gpu-id"; static std::string get_absolute_file_path(const std::string &cfg_file_path, const std::string &file_path) { char abs_cfg_path[PATH_MAX + 1]; if (!file_path.empty() && file_path[0] == '/') { return file_path; } if (!realpath(cfg_file_path.c_str(), abs_cfg_path)) { return ""; } // Return absolute path of config file if file_path is NULL. if (file_path.empty()) { return abs_cfg_path; } std::string dir_path(abs_cfg_path); size_t last_slash = dir_path.find_last_of('/'); if (last_slash != std::string::npos) { dir_path = dir_path.substr(0, last_slash + 1); } return dir_path + file_path; } static void cb_newpad(GstElement *decodebin, GstPad *decoder_src_pad, gpointer data) { GstCaps *caps = gst_pad_get_current_caps(decoder_src_pad); if (!caps) { caps = gst_pad_query_caps(decoder_src_pad, nullptr); } const GstStructure *str = gst_caps_get_structure(caps, 0); const gchar *name = gst_structure_get_name(str); GstElement *source_bin = static_cast(data); GstCapsFeatures *features = gst_caps_get_features(caps, 0); /* Need to check if the pad created by the decodebin is for video and not * audio. */ if (!strncmp(name, "video", 5)) { /* Link the decodebin pad only if decodebin has picked nvidia * decoder plugin nvdec_*. We do this by checking if the pad caps contain * NVMM memory features. */ if (gst_caps_features_contains(features, GST_CAPS_FEATURES_NVMM)) { /* Get the source bin ghost pad */ GstPad *bin_ghost_pad = gst_element_get_static_pad(source_bin, "src"); if (!gst_ghost_pad_set_target(GST_GHOST_PAD(bin_ghost_pad), decoder_src_pad)) { std::cerr << "Failed to link decoder src pad to source bin ghost pad" << std::endl; } gst_object_unref(bin_ghost_pad); } else { std::cerr << "Error: Decodebin did not pick nvidia decoder plugin." << std::endl; } } gst_caps_unref(caps); } static void decodebin_child_added(GstChildProxy *child_proxy, GObject *object, gchar *name, gpointer user_data) { std::cout << "Decodebin child added: " << name << std::endl; if (g_strrstr(name, "decodebin") == name) { g_signal_connect(G_OBJECT(object), "child-added", G_CALLBACK(decodebin_child_added), user_data); } } static GstElement * create_source_bin(guint index, gchar *uri) { GstElement *bin = nullptr, *uri_decode_bin = nullptr; std::string bin_name = "source-bin-" + std::to_string(index); bin = gst_bin_new(bin_name.c_str()); if (PERF_MODE) { uri_decode_bin = gst_element_factory_make("nvurisrcbin", "uri-decode-bin"); g_object_set(G_OBJECT(uri_decode_bin), "file-loop", TRUE, nullptr); } else { uri_decode_bin = gst_element_factory_make("uridecodebin", "uri-decode-bin"); } if (!bin || !uri_decode_bin) { std::cerr << "One element in source bin could not be created." << std::endl; return nullptr; } /* We set the input uri to the source element */ g_object_set(G_OBJECT(uri_decode_bin), "uri", uri, nullptr); /* Connect to the "pad-added" signal of the decodebin which generates a * callback once a new pad for raw data has beed created by the decodebin */ g_signal_connect(G_OBJECT(uri_decode_bin), "pad-added", G_CALLBACK(cb_newpad), bin); g_signal_connect(G_OBJECT(uri_decode_bin), "child-added", G_CALLBACK(decodebin_child_added), bin); gst_bin_add(GST_BIN(bin), uri_decode_bin); if (!gst_element_add_pad(bin, gst_ghost_pad_new_no_target("src", GST_PAD_SRC))) { std::cerr << "Failed to add ghost pad in source bin" << std::endl; return nullptr; } return bin; } static gboolean start_rtsp_streaming(guint rtsp_port_num, guint updsink_port_num, guint64 udp_buffer_size) { GstRTSPMountPoints *mounts; GstRTSPMediaFactory *factory; std::string udpsrc_pipeline; std::string port_num_str; if (udp_buffer_size == 0) udp_buffer_size = 512 * 1024; udpsrc_pipeline = "( udpsrc name=pay0 port=" + std::to_string(updsink_port_num) + " buffer-size=" + std::to_string(udp_buffer_size) + " caps=\"application/x-rtp, media=video, " "clock-rate=90000, encoding-name=H264, payload=96 \" )"; port_num_str = std::to_string(rtsp_port_num); server = gst_rtsp_server_new(); g_object_set(server, "service", port_num_str.c_str(), nullptr); mounts = gst_rtsp_server_get_mount_points(server); factory = gst_rtsp_media_factory_new(); gst_rtsp_media_factory_set_launch(factory, udpsrc_pipeline.c_str()); gst_rtsp_mount_points_add_factory(mounts, "/ds-test", factory); g_object_unref(mounts); gst_rtsp_server_attach(server, nullptr); std::cout << "\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:" << rtsp_port_num << "/ds-test ***\n" << std::endl; return TRUE; } static GstRTSPFilterResult client_filter(GstRTSPServer *server, GstRTSPClient *client, gpointer user_data) { return GST_RTSP_FILTER_REMOVE; } static void destroy_sink_bin() { if (!server) return; GstRTSPMountPoints *mounts; GstRTSPSessionPool *pool; mounts = gst_rtsp_server_get_mount_points(server); gst_rtsp_mount_points_remove_factory(mounts, "/ds-test"); g_object_unref(mounts); gst_rtsp_server_client_filter(server, client_filter, nullptr); pool = gst_rtsp_server_get_session_pool(server); gst_rtsp_session_pool_cleanup(pool); g_object_unref(pool); } static bool set_tracker_properties(GstElement *nvtracker) { bool ret = false; GError *error = nullptr; gchar **keys = nullptr; // Create key file for parsing config GKeyFile *key_file = g_key_file_new(); if (!g_key_file_load_from_file(key_file, TRACKER_CONFIG_FILE, G_KEY_FILE_NONE, &error)) { if (error) { g_printerr("Failed to load config file: %s\n", error->message); g_error_free(error); } else { g_printerr("Failed to load config file.\n"); } return false; } // Get all keys from tracker config keys = g_key_file_get_keys(key_file, CONFIG_GROUP_TRACKER, nullptr, &error); CHECK_ERROR(error); // Process each key for (gchar **key = keys; *key; key++) { if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_WIDTH)) { gint width = g_key_file_get_integer(key_file, CONFIG_GROUP_TRACKER, CONFIG_GROUP_TRACKER_WIDTH, &error); CHECK_ERROR(error); g_object_set(G_OBJECT(nvtracker), "user-meta-pool-size", 64, nullptr); g_object_set(G_OBJECT(nvtracker), "tracker-width", width, nullptr); } else if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_HEIGHT)) { gint height = g_key_file_get_integer(key_file, CONFIG_GROUP_TRACKER, CONFIG_GROUP_TRACKER_HEIGHT, &error); CHECK_ERROR(error); g_object_set(G_OBJECT(nvtracker), "tracker-height", height, nullptr); } else if (!g_strcmp0(*key, CONFIG_GPU_ID)) { guint gpu_id = g_key_file_get_integer(key_file, CONFIG_GROUP_TRACKER, CONFIG_GPU_ID, &error); CHECK_ERROR(error); g_object_set(G_OBJECT(nvtracker), "gpu_id", gpu_id, nullptr); } else if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_LL_CONFIG_FILE)) { std::string config_path = g_key_file_get_string(key_file, CONFIG_GROUP_TRACKER, CONFIG_GROUP_TRACKER_LL_CONFIG_FILE, &error); CHECK_ERROR(error); std::string ll_config_file = get_absolute_file_path(TRACKER_CONFIG_FILE, config_path); if (!ll_config_file.empty()) { g_object_set(G_OBJECT(nvtracker), "ll-config-file", ll_config_file.c_str(), nullptr); } } else if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_LL_LIB_FILE)) { std::string lib_path = g_key_file_get_string(key_file, CONFIG_GROUP_TRACKER, CONFIG_GROUP_TRACKER_LL_LIB_FILE, &error); CHECK_ERROR(error); std::string ll_lib_file = get_absolute_file_path(TRACKER_CONFIG_FILE, lib_path); if (!ll_lib_file.empty()) { g_object_set(G_OBJECT(nvtracker), "ll-lib-file", ll_lib_file.c_str(), nullptr); } } else { g_printerr("Unknown key '%s' for group [%s]", *key, CONFIG_GROUP_TRACKER); } } ret = true; done: if (error) { g_error_free(error); } if (keys) { g_strfreev(keys); } if (!ret) { g_printerr("%s failed", __func__); } return ret; } int main(int argc, char *argv[]) { GMainLoop *loop = nullptr; GstElement *pipeline = nullptr, *streammux = nullptr, *sink = nullptr, *pgie = nullptr, *queue1 = nullptr, *queue2 = nullptr, *queue3 = nullptr, *nvtracker = nullptr, *nvvidconv = nullptr, *nvosd = nullptr, *tiler = nullptr, *nvdslogger = nullptr, *nvdsanalytics = nullptr, *nvmsgconv = nullptr, *filter1 = nullptr, *nvmsgbroker = nullptr, *tee = nullptr, *queue_kafka = nullptr, *queue_display = nullptr; GstBus *bus = nullptr; guint bus_watch_id; GstPad *nvdsanalytics_src_pad = nullptr; GstPad *pgie_src_pad = nullptr; guint i = 0, num_sources = 0; guint tiler_rows, tiler_columns; guint pgie_batch_size; GstCaps *caps4 = nullptr; guint gpu_id = 0; std::cout << "Initializing OCR Triton client..." << std::endl; try { g_ocr_client = new OCRTritonClient("192.168.1.113:8001", "nvOCDR"); // Test the connection if (g_ocr_client->isServerReady() && g_ocr_client->isModelReady()) { std::cout << "✅ OCR client initialized and verified successfully" << std::endl; } else { std::cerr << "⚠️ OCR client initialized but server/model not ready" << std::endl; delete g_ocr_client; g_ocr_client = nullptr; } } catch (const std::exception& e) { std::cerr << "❌ Failed to initialize OCR client: " << e.what() << std::endl; g_ocr_client = nullptr; } catch (...) { std::cerr << "❌ Unknown error initializing OCR client" << std::endl; g_ocr_client = nullptr; } if (!g_ocr_client) { std::cout << "⚠️ Running without OCR functionality" << std::endl; } // Check for performance mode environment variable const char *perf_env = g_getenv("NVDS_TEST3_PERF_MODE"); PERF_MODE = perf_env && !g_strcmp0(perf_env, "1"); int current_device = -1; cudaGetDevice(¤t_device); struct cudaDeviceProp prop; cudaGetDeviceProperties(&prop, current_device); /* Check input arguments */ if (argc < 2) { std::cerr << "Usage: " << argv[0] << " " << std::endl; std::cerr << "OR: " << argv[0] << " [uri2] ... [uriN]" << std::endl; return -1; } /* Standard GStreamer initialization */ gst_init(&argc, &argv); loop = g_main_loop_new(nullptr, FALSE); /* Create gstreamer elements */ /* Create Pipeline element that will form a connection of other elements */ pipeline = gst_pipeline_new("deepstram_yolo-pipeline"); /* Create nvstreammux instance to form batches from one or more sources. */ streammux = gst_element_factory_make("nvstreammux", "stream-muxer"); if (!pipeline || !streammux) { std::cerr << "One element could not be created. Exiting." << std::endl; return -1; } gst_bin_add(GST_BIN(pipeline), streammux); GList *src_list = nullptr; if (g_str_has_suffix(argv[1], ".yml") || g_str_has_suffix(argv[1], ".yaml")) { nvds_parse_source_list(&src_list, argv[1], "source-list"); GList *temp = src_list; while (temp) { num_sources++; temp = temp->next; } g_list_free(temp); } else { num_sources = argc - 1; } for (i = 0; i < num_sources; i++) { GstPad *sinkpad, *srcpad; gchar pad_name[16] = {}; // std::string pad_name = "sink_" + std::to_string(i); g_snprintf(pad_name, 15, "sink_%u", i); GstElement *source_bin = nullptr; if (g_str_has_suffix(argv[1], ".yml") || g_str_has_suffix(argv[1], ".yaml")) { std::cout << "Now playing : " << static_cast(src_list->data) << std::endl; source_bin = create_source_bin(i, static_cast(src_list->data)); } else { source_bin = create_source_bin(i, argv[i + 1]); } if (!source_bin) { std::cerr << "Failed to create source bin. Exiting." << std::endl; return -1; } gst_bin_add(GST_BIN(pipeline), source_bin); sinkpad = gst_element_get_request_pad(streammux, pad_name); if (!sinkpad) { std::cerr << "Streammux request sink pad failed. Exiting." << std::endl; return -1; } srcpad = gst_element_get_static_pad(source_bin, "src"); if (!srcpad) { std::cerr << "Failed to get src pad of source bin. Exiting." << std::endl; return -1; } if (gst_pad_link(srcpad, sinkpad) != GST_PAD_LINK_OK) { std::cerr << "Failed to link source bin to stream muxer. Exiting." << std::endl; return -1; } gst_object_unref(srcpad); gst_object_unref(sinkpad); if (g_str_has_suffix(argv[1], ".yml") || g_str_has_suffix(argv[1], ".yaml")) { src_list = src_list->next; } } if (g_str_has_suffix(argv[1], ".yml") || g_str_has_suffix(argv[1], ".yaml")) { g_list_free(src_list); } /* Use nvinfer to infer on batched frame. */ pgie = gst_element_factory_make("nvinfer", "primary-nvinference-engine"); nvtracker = gst_element_factory_make("nvtracker", "tracker"); /* Use nvdslogger for perf measurement. */ nvdslogger = gst_element_factory_make("nvdslogger", "nvdslogger"); nvdsanalytics = gst_element_factory_make("nvdsanalytics", "nvdsanalytics"); /* Use nvtiler to composite the batched frames into a 2D tiled array based * on the source of the frames. */ tiler = gst_element_factory_make("nvmultistreamtiler", "nvtiler"); /* Use convertor to convert from NV12 to RGBA as required by nvosd */ nvvidconv = gst_element_factory_make("nvvideoconvert", "nvvideo-converter"); nvvidconv1 = gst_element_factory_make("nvvideoconvert", "converter1"); GstCaps *caps1 = gst_caps_from_string("video/x-raw(memory:NVMM), format=RGBA"); filter1 = gst_element_factory_make("capsfilter", "filter1"); /* Create OSD to draw on the converted RGBA buffer */ nvosd = gst_element_factory_make("nvdsosd", "nv-onscreendisplay"); tee = gst_element_factory_make("tee", "tee"); /* Add queue elements between every two elements */ queue1 = gst_element_factory_make("queue", "queue1"); queue2 = gst_element_factory_make("queue", "queue2"); queue3 = gst_element_factory_make("queue", "queue3"); queue_kafka = gst_element_factory_make("queue", "queue_kafka"); queue_display = gst_element_factory_make("queue", "queue_display"); nvmsgconv = gst_element_factory_make("nvmsgconv", "msgconv"); nvmsgbroker = gst_element_factory_make("nvmsgbroker", "broker"); if (PERF_MODE) { sink = gst_element_factory_make("fakesink", "nvvideo-renderer"); } else { // Remove nvvidconv1 and filter1 as nvv4l2h264enc can work with NVMM memory directly g_object_set(G_OBJECT(nvvidconv1), "nvbuf-memory-type", 2); g_object_set(G_OBJECT(filter1), "caps", caps1); nvv4l2h264enc = gst_element_factory_make("nvv4l2h264enc", "h264-encoder"); parse = gst_element_factory_make("h264parse", "h264-parser2"); rtppay = gst_element_factory_make("rtph264pay", "rtp-payer"); sink = gst_element_factory_make("udpsink", "udp-sink"); // Configure the encoder g_object_set(G_OBJECT(nvv4l2h264enc), "bitrate", 4000000, nullptr); // g_object_set(G_OBJECT(nvv4l2h264enc), "maxperf-enable", TRUE, nullptr); // g_object_set(G_OBJECT(nvv4l2h264enc), "preset-level", 1, nullptr); // 1 = UltraFastPreset // g_object_set(G_OBJECT(nvv4l2h264enc), "insert-sps-pps", TRUE, nullptr); g_object_set(G_OBJECT(nvv4l2h264enc), "gpu-id", gpu_id, nullptr); g_object_set(G_OBJECT(sink), "host", "127.0.0.1", "port", udp_port, "async", FALSE, "sync", 1, nullptr); } if (!pgie || !nvdslogger || !tiler || !nvvidconv || !nvosd || !sink || !nvtracker || !nvdsanalytics || !tee || !nvmsgconv || !nvmsgbroker || !queue_kafka || !queue_display) { std::cerr << "One element could not be created. Exiting." << std::endl; return -1; } if (!PERF_MODE && (!nvv4l2h264enc || !parse || !rtppay)) { std::cerr << "Encoder elements could not be created. Exiting." << std::endl; return -1; } g_object_set(G_OBJECT(streammux), "batch-size", num_sources, nullptr); g_object_set(G_OBJECT(streammux), "width", MUXER_OUTPUT_WIDTH, "height", MUXER_OUTPUT_HEIGHT, "batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC, nullptr); /* Configure the nvinfer element using the nvinfer config file. */ g_object_set(G_OBJECT(pgie), "config-file-path", "configs_seg/primary_detection_seg/config_infer_primary_yoloV8_seg.txt", nullptr); if (!set_tracker_properties(nvtracker)) { g_printerr("Failed to set tracker properties. Exiting.\n"); return -1; } /* Configure Kafka messaging components */ std::string lib_path_kafka = "configs/libnvds_kafka_proto.so"; g_object_set(G_OBJECT(nvmsgconv), "config", "configs_seg/msgconv_config.txt", nullptr); g_object_set(G_OBJECT(nvmsgconv), "payload-type", 0, nullptr); g_object_set(G_OBJECT(nvmsgbroker), "proto-lib", lib_path_kafka.c_str(), nullptr); g_object_set(G_OBJECT(nvmsgbroker), "conn-str", "broker;9092;deepstream-analytics", nullptr); g_object_set(G_OBJECT(nvmsgbroker), "sync", false, nullptr); std::cout << "✅ Kafka broker configured: broker:9092, topic: deepstream-analytics" << std::endl; /* Override the batch-size set in the config file with the number of sources. */ g_object_get(G_OBJECT(pgie), "batch-size", &pgie_batch_size, nullptr); if (pgie_batch_size != num_sources) { std::cerr << "WARNING: Overriding infer-config batch-size (" << pgie_batch_size << ") with number of sources (" << num_sources << ")" << std::endl; g_object_set(G_OBJECT(pgie), "batch-size", num_sources, nullptr); } /* Configure the nvdsanalytics element for using the particular analytics config file*/ g_object_set(G_OBJECT(nvdsanalytics), "config-file", "configs_seg/config_nvdsanalytics.txt", nullptr); tiler_rows = static_cast(sqrt(num_sources)); tiler_columns = static_cast(ceil(1.0 * num_sources / tiler_rows)); /* we set the tiler properties here */ g_object_set(G_OBJECT(tiler), "rows", tiler_rows, "columns", tiler_columns, "width", TILED_OUTPUT_WIDTH, "height", TILED_OUTPUT_HEIGHT, nullptr); g_object_set(G_OBJECT(nvosd), "process-mode", OSD_PROCESS_MODE, "display-text", OSD_DISPLAY_TEXT, "display-mask", 1, nullptr); g_object_set(G_OBJECT(nvosd), "gpu-id", gpu_id, nullptr); g_object_set(G_OBJECT(sink), "qos", 1, nullptr); /* we add a message handler */ bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); bus_watch_id = gst_bus_add_watch(bus, bus_call, loop); gst_object_unref(bus); // Add elements to pipeline gst_bin_add_many(GST_BIN(pipeline), queue1, pgie, nvtracker, nvvidconv1, filter1, nvdsanalytics, nvdslogger, tiler, nvvidconv, nvosd, tee, queue_kafka, nvmsgconv, nvmsgbroker, queue_display, nullptr); if (!PERF_MODE) { gst_bin_add_many(GST_BIN(pipeline), nvv4l2h264enc, parse, rtppay, sink, nullptr); } else { gst_bin_add(GST_BIN(pipeline), sink); } /* Link the elements together */ if (!gst_element_link_many(streammux, queue1, pgie, nvtracker, nvvidconv1, filter1, nvdsanalytics, nvdslogger, tiler, nvvidconv, nvosd, tee, nullptr)) { std::cerr << "Elements could not be linked. Exiting." << std::endl; return -1; } std::cout << "Main pipeline elements linked successfully" << std::endl; // Get tee pads for branching GstPad *tee_kafka_pad = gst_element_get_request_pad(tee, "src_%u"); GstPad *tee_display_pad = gst_element_get_request_pad(tee, "src_%u"); if (!tee_kafka_pad || !tee_display_pad) { std::cerr << "Failed to get tee source pads. Exiting." << std::endl; return -1; } // Link tee to queue pads GstPad *queue_kafka_sink_pad = gst_element_get_static_pad(queue_kafka, "sink"); GstPad *queue_display_sink_pad = gst_element_get_static_pad(queue_display, "sink"); if (gst_pad_link(tee_kafka_pad, queue_kafka_sink_pad) != GST_PAD_LINK_OK) { std::cerr << "Failed to link tee to kafka queue. Exiting." << std::endl; return -1; } if (gst_pad_link(tee_display_pad, queue_display_sink_pad) != GST_PAD_LINK_OK) { std::cerr << "Failed to link tee to display queue. Exiting." << std::endl; return -1; } gst_object_unref(queue_kafka_sink_pad); gst_object_unref(queue_display_sink_pad); // Link kafka branch if (!gst_element_link_many(queue_kafka, nvmsgconv, nvmsgbroker, nullptr)) { std::cerr << "Failed to link kafka branch. Exiting." << std::endl; return -1; } std::cout << "✅ Kafka branch linked successfully" << std::endl; // Link display branch if (!PERF_MODE) { // Direct link from queue_display to nvv4l2h264enc (no conversion needed) if (!gst_element_link_many(queue_display, nvv4l2h264enc, parse, rtppay, sink, nullptr)) { std::cerr << "Failed to link display branch. Exiting." << std::endl; return -1; } std::cout << "✅ RTSP display branch linked successfully with nvv4l2h264enc" << std::endl; } else { if (!gst_element_link(queue_display, sink)) { std::cerr << "Failed to link display branch to fakesink. Exiting." << std::endl; return -1; } std::cout << "✅ Performance mode display branch linked successfully" << std::endl; } NvDsObjEncCtxHandle obj_ctx_handle = nvds_obj_enc_create_context(gpu_id); if (!obj_ctx_handle) { std::cout << "Unable to create context\n"; return -1; } nvdsanalytics_src_pad = gst_element_get_static_pad(nvdsanalytics, "src"); if (!nvdsanalytics_src_pad) g_print("Unable to get src pad\n"); else gst_pad_add_probe(nvdsanalytics_src_pad, GST_PAD_PROBE_TYPE_BUFFER, nvdsanalytics_src_pad_buffer_probe, static_cast(obj_ctx_handle), nullptr); gst_object_unref(nvdsanalytics_src_pad); /* Set the pipeline to "playing" state */ std::cout << "Now playing:"; for (i = 0; i < num_sources; i++) { std::cout << " " << argv[i + 1] << ","; } std::cout << std::endl; gst_element_set_state(pipeline, GST_STATE_PLAYING); start_rtsp_streaming(8555 /*rtsp_port*/, udp_port, 0); /* Wait till pipeline encounters an error or EOS */ std::cout << "🚀 Pipeline running with Kafka metadata streaming enabled..." << std::endl; std::cout << "📊 Analytics data will be sent to Kafka topic: deepstream-analytics" << std::endl; std::cout << "📺 RTSP stream available at: rtsp://localhost:8555/ds-test" << std::endl; g_main_loop_run(loop); nvds_obj_enc_destroy_context(obj_ctx_handle); /* Out of the main loop, clean up nicely */ std::cout << "Returned, stopping playback" << std::endl; gst_element_set_state(pipeline, GST_STATE_NULL); std::cout << "Deleting pipeline" << std::endl; gst_object_unref(GST_OBJECT(pipeline)); g_source_remove(bus_watch_id); g_main_loop_unref(loop); destroy_sink_bin(); if (g_ocr_client) { try { delete g_ocr_client; g_ocr_client = nullptr; std::cout << "✅ OCR client cleaned up" << std::endl; } catch (...) { std::cerr << "❌ Error during OCR client cleanup" << std::endl; g_ocr_client = nullptr; } } return 0; }