Skip to content

ZMQ Input Plugin

Description

The ZMQ Input plugin provides high-performance data ingestion capabilities through ZeroMQ (ØMQ) messaging for CVEDIA-RT. It enables the system to receive real-time data streams, subscribe to distributed message publishers, and process incoming data from external ZeroMQ-enabled applications and sensors.

Key input capabilities include:

  • ZeroMQ Integration: Native ZeroMQ messaging library integration for data input
  • Multiple Input Patterns: Support for SUB (subscriber), PULL (pipeline), and REP (reply) socket patterns
  • Real-time Data Streaming: Low-latency data ingestion from external sources
  • Topic-based Filtering: Subscribe to specific data topics and message types
  • Distributed Input: Receive data from multiple distributed publishers
  • Asynchronous Processing: Non-blocking message reception and processing
  • Format Flexibility: JSON-based message format with automatic deserialization
  • Connection Management: Robust connection handling and reconnection logic

Requirements

Software Dependencies

  • ZeroMQ Library (libzmq):
  • ZeroMQ 4.3.x or later (recommended)
  • ZMQ C++ bindings (zmq.hpp)
  • CVEDIA-RT Core - Platform core functionality
  • Network Libraries - TCP/UDP networking support

Platform Requirements

  • Windows: ZeroMQ Windows packages or vcpkg installation
  • Linux: ZeroMQ development packages (libzmq3-dev, libzmq5-dev)
  • Network Configuration: Accessible network ports for ZMQ communication

Hardware Requirements

  • Sufficient network bandwidth for expected data throughput
  • Memory capacity for message buffering and processing
  • CPU resources for message deserialization and processing

Configuration

Basic Input Configuration

{
  "zmq_server": "tcp://*:5555",
  "socket_type": "SUB",
  "topics": ["sensor_data", "camera_feed"],
  "buffer_size": 1000
}

Advanced Input Configuration

{
  "zmq_server": "tcp://192.168.1.100:5555",
  "socket_type": "PULL",
  "topics": ["alerts", "detections", "analytics"],
  "buffer_size": 5000,
  "timeout_ms": 1000,
  "reconnect_interval": 5000,
  "high_water_mark": 10000,
  "linger_time": 1000
}

Multi-Source Input Configuration

{
  "inputs": [
    {
      "zmq_server": "tcp://sensor1.local:5555",
      "socket_type": "SUB",
      "topics": ["temperature", "humidity"]
    },
    {
      "zmq_server": "tcp://camera-server:5556", 
      "socket_type": "PULL",
      "topics": ["frame_data"]
    }
  ],
  "buffer_size": 2000,
  "enable_filtering": true
}

Configuration Parameters

Parameter Type Default Description
zmq_server string "tcp://*:5555" ZMQ server endpoint for input connections
socket_type string "SUB" ZMQ socket pattern (SUB, PULL, REP)
topics array [] List of topics to subscribe to (SUB sockets only)
buffer_size integer 1000 Input message buffer size
timeout_ms integer 1000 Receive timeout in milliseconds
reconnect_interval integer 5000 Reconnection interval in milliseconds
high_water_mark integer 1000 Maximum queued messages per socket
linger_time integer 0 Socket linger time on close
enable_filtering boolean false Enable topic-based message filtering

Supported ZMQ Input Patterns

SUB (Subscriber) Pattern

Use Case: Subscribe to data from one or more publishers - Socket Type: ZMQ_SUB - Connection: Connects to publisher endpoints - Topics: Supports topic-based filtering - Scalability: Many subscribers can connect to one publisher

{
  "socket_type": "SUB",
  "zmq_server": "tcp://publisher.example.com:5555",
  "topics": ["sensor_data", "alerts"]
}

PULL (Pipeline) Pattern

Use Case: Receive work items from pushers in a load-balanced manner - Socket Type: ZMQ_PULL - Connection: Binds to receive from multiple pushers - Distribution: Round-robin distribution among pullers - Use Case: Distributed task processing

{
  "socket_type": "PULL", 
  "zmq_server": "tcp://*:5556"
}

REP (Reply) Pattern

Use Case: Receive requests and send replies (request-response pattern) - Socket Type: ZMQ_REP - Connection: Binds to receive requests from REQ sockets - Protocol: Synchronous request-response messaging - Use Case: API endpoint, service queries

{
  "socket_type": "REP",
  "zmq_server": "tcp://*:5557"
}

API Reference

C++ API - ZMQCore Class

Connection Management

bool connect(const std::string& zmqServer);  // Connect to ZMQ endpoint
bool disconnect();                           // Disconnect from endpoint
bool isConnected();                         // Check connection status

Input Operations

nlohmann::json readMessage();               // Read incoming message (blocking)
expected<pCValue> readMessage();            // Read message as CValue object
void eventHandler(pCValue val);             // Handle incoming events

Configuration

struct config {
    std::string mqtt_server = "";           // ZMQ server endpoint
    std::string client_id = "";             // Client identifier
    std::string topic = "";                 // Default topic filter
};

Lua API - ZMQ Input Interface

Connection Methods

-- Connect to ZMQ server for input
zmq:connect()                               -- Connect using configured server
zmq:connect(server_url)                     -- Connect to specific server

-- Check connection status  
local connected = zmq:isConnected()

Message Reading

-- Read incoming messages
local message = zmq:readMessage()           -- Read next message (blocking)
if message and message ~= "" then
    -- Process received message
    processIncomingData(message)
end

Configuration Management

-- Get current configuration
local config = zmq:getConfig()

-- Update configuration for input
zmq:saveConfig({
    zmq_server = "tcp://data-source:5555",
    socket_type = "SUB", 
    topics = {"sensor_data", "camera_feed"}
})

Examples

Basic Data Subscription

-- Create ZMQ input instance
local zmq_input = api.factory.zmq.create(instance, "data_subscriber")

-- Configure for subscribing to sensor data
zmq_input:saveConfig({
    zmq_server = "tcp://sensor-hub:5555",
    socket_type = "SUB",
    topics = {"temperature", "pressure", "humidity"}
})

-- Connect and start receiving
zmq_input:connect()

-- Message processing loop
while true do
    local message = zmq_input:readMessage()
    if message and message ~= "" then
        -- Parse and process sensor data
        local data = json.decode(message)
        processSensorReading(data)
    end
end

Pipeline Worker Input

// Create ZMQ input for pipeline processing
ZMQCore inputWorker;

// Configure as pipeline worker (PULL socket)
inputWorker.pluginConf.mqtt_server = "tcp://*:5556";
inputWorker.connect("tcp://*:5556");

// Process incoming work items
while (inputWorker.isConnected()) {
    auto workItem = inputWorker.readMessage();
    if (!workItem.empty()) {
        // Process work item
        processWorkItem(workItem);
    }
}

Multi-Topic Subscriber with Filtering

-- Configure multi-topic subscriber
local subscriber = api.factory.zmq.create(instance, "multi_subscriber")

subscriber:saveConfig({
    zmq_server = "tcp://data-publisher:5555",
    socket_type = "SUB",
    topics = {"alerts", "detections", "analytics"},
    enable_filtering = true,
    buffer_size = 5000
})

subscriber:connect()

-- Message processing with topic filtering
function processMessage()
    local message = subscriber:readMessage()
    if message and message ~= "" then
        local data = json.decode(message)

        -- Route based on topic
        if data.topic == "alerts" then
            handleAlert(data.payload)
        elseif data.topic == "detections" then
            handleDetection(data.payload)
        elseif data.topic == "analytics" then
            handleAnalytics(data.payload)
        end
    end
end

Request-Response Input Service

// Create ZMQ service input (REP socket)
ZMQCore serviceInput;
serviceInput.connect("tcp://*:5557");

// Service request processing loop
while (serviceInput.isConnected()) {
    // Receive request
    auto request = serviceInput.readMessage();

    if (!request.empty()) {
        // Process request
        auto response = processServiceRequest(request);

        // Send reply (REP socket requirement)
        serviceInput.writeMessage("", response);
    }
}

Performance Considerations

Input Throughput Optimization

  • Buffer Sizing: Configure appropriate buffer sizes based on expected message rates
  • High Water Mark: Set HWM to prevent memory issues during traffic spikes
  • Topic Filtering: Use specific topic subscriptions to reduce unnecessary data processing
  • Asynchronous Processing: Process messages on separate threads to avoid blocking input

Memory Management

  • Message Buffering: Monitor memory usage with high-volume message streams
  • Buffer Limits: Implement buffer size limits to prevent out-of-memory conditions
  • Garbage Collection: Regular cleanup of processed messages and connections
  • Connection Pooling: Reuse connections when possible to reduce overhead

Network Optimization

  • Connection Types: Choose appropriate transport (TCP, IPC, inproc) based on use case
  • Compression: Consider message compression for large payloads
  • Batching: Process messages in batches when possible for better throughput
  • Keep-Alive: Configure appropriate heartbeat and keep-alive settings

Troubleshooting

Input Connection Issues

Cannot connect to ZMQ publisher - Verify the publisher is running and accessible - Check network connectivity and firewall settings - Ensure ZMQ server endpoint is correctly configured - Validate port availability and permissions

Messages not being received - Check topic subscription configuration (SUB sockets) - Verify publisher is sending to subscribed topics - Monitor network traffic and ZMQ socket status - Check buffer sizes and high water mark settings

Performance Issues

High message latency - Reduce buffer sizes for lower latency - Optimize message processing logic - Consider using faster transport methods (IPC vs TCP) - Monitor system resources and network bandwidth

Memory usage growing continuously - Implement proper message buffer limits - Check for message processing bottlenecks - Monitor garbage collection and cleanup processes - Adjust high water mark settings

Data Processing Issues

Message format errors - Validate JSON message structure and encoding - Check for message truncation or corruption - Verify data serialization/deserialization logic - Monitor message size limits and constraints

Missing or duplicate messages - Check ZMQ socket patterns and message delivery guarantees - Implement message acknowledgment where needed - Monitor network reliability and connection stability - Consider implementing message sequence numbers

Error Messages

  • "ZMQ Connection failed": Check server availability and network connectivity
  • "Socket bind failed": Verify port availability and permissions
  • "Message buffer overflow": Increase buffer size or improve processing speed
  • "Topic subscription failed": Check topic name format and publisher configuration

See Also