Skip to content

ZMQ Plugin

Description

The ZMQ plugin provides high-performance messaging integration using the ZeroMQ protocol through the Output plugin system. It registers the zmq:// URI scheme and serves as an output handler for streaming analytics data to ZeroMQ consumers. The plugin supports multiple ZeroMQ patterns and provides scalable, low-latency data distribution capabilities.

Key Features

  • Output Handler Integration: Registered zmq:// URI scheme with Output plugin
  • Multiple ZMQ Patterns: PUB, PUSH, REQ socket types for different messaging patterns
  • High Performance: Asynchronous, non-blocking message publishing
  • JSON Message Formatting: Structured data export with automatic serialization
  • Event-driven Architecture: Real-time analytics streaming integration
  • Connection Management: Automatic connection handling and error recovery
  • Distributed Messaging: Scalable messaging for distributed systems
  • Topic-based Routing: Flexible topic-based message distribution
  • TCP/IPC/InProc Transport: Multiple transport protocols support

When to Use

  • Real-time analytics data streaming to distributed systems
  • High-frequency data publishing with low latency requirements
  • Microservices event-driven architectures
  • Integration with ZMQ-based message brokers and consumers
  • Scalable data distribution for large deployments
  • Inter-process communication (IPC) for local system integration
  • Queue-based data processing workflows
  • Edge-to-cloud data streaming applications

Requirements

Software Dependencies

  • ZeroMQ library (libzmq) version 4.0 or higher
  • Network connectivity for TCP transport
  • IPC support for inter-process communication
  • C++ runtime compatible with ZMQ bindings

Network Requirements

  • TCP/IP connectivity for remote ZMQ endpoints
  • Default ZMQ ports: User-configurable (commonly 5555, 5556, etc.)
  • Firewall rules allowing outbound connections
  • Low-latency network for real-time streaming applications

Configuration

The ZMQ plugin is configured through the Output plugin system using the zmq:// URI scheme.

Basic Configuration

{
  "output": {
    "handlers": {
      "zmq-publisher": {
        "uri": "zmq://tcp://*:5555",
        "sink": "events",
        "enabled": true,
        "config": {
          "socket_type": "PUB",
          "topic": "analytics.events",
          "high_water_mark": 1000
        }
      }
    }
  }
}

Advanced Configuration

{
  "output": {
    "handlers": {
      "zmq-analytics": {
        "uri": "zmq://tcp://analytics-server:5555",
        "sink": "events",
        "enabled": true,
        "config": {
          "socket_type": "PUSH",
          "topic": "analytics.detections",
          "high_water_mark": 5000,
          "linger": 1000,
          "send_timeout": 100,
          "reconnect_interval": 5000,
          "max_reconnect_attempts": 10,
          "queue_size": 10000
        }
      }
    }
  }
}

Multi-Pattern Configuration

{
  "output": {
    "handlers": {
      "zmq-publisher": {
        "uri": "zmq://tcp://*:5555",
        "sink": "events",
        "enabled": true,
        "config": {
          "socket_type": "PUB",
          "topic": "analytics.events",
          "high_water_mark": 1000
        }
      },
      "zmq-worker": {
        "uri": "zmq://tcp://work-queue:5556",
        "sink": "processing_tasks",
        "enabled": true,
        "config": {
          "socket_type": "PUSH",
          "high_water_mark": 5000,
          "send_timeout": 50
        }
      },
      "zmq-request": {
        "uri": "zmq://tcp://service-endpoint:5557",
        "sink": "service_requests",
        "enabled": true,
        "config": {
          "socket_type": "REQ",
          "request_timeout": 5000,
          "retries": 3
        }
      }
    }
  }
}

Configuration Schema

Parameter Type Default Description
uri string required ZMQ endpoint URI (zmq://transport://address:port)
sink string "events" Data sink to connect to
enabled boolean true Enable/disable ZMQ output
config.socket_type string "PUSH" ZMQ socket type (PUB, PUSH, REQ)
config.topic string "" Topic prefix for PUB sockets
config.high_water_mark integer 1000 Maximum queued messages
config.linger integer 1000 Socket linger time in milliseconds
config.send_timeout integer 100 Send operation timeout in milliseconds
config.reconnect_interval integer 5000 Reconnection interval in milliseconds
config.max_reconnect_attempts integer -1 Maximum reconnection attempts (-1 = infinite)
config.queue_size integer 1000 Internal message queue size
config.request_timeout integer 5000 Request timeout for REQ sockets (milliseconds)
config.retries integer 3 Number of retries for REQ sockets

Supported ZMQ Patterns

Publisher (PUB)

Publisher pattern for one-to-many data distribution. Subscribers connect and receive messages matching their subscriptions.

{
  "config": {
    "socket_type": "PUB",
    "topic": "analytics.detections",
    "high_water_mark": 1000
  }
}

Use cases: - Broadcasting analytics events to multiple consumers - Real-time data feeds for monitoring systems - Event notification distribution

Push

Push pattern for distributing work to multiple workers. Messages are load-balanced across connected workers.

{
  "config": {
    "socket_type": "PUSH",
    "high_water_mark": 5000,
    "send_timeout": 100
  }
}

Use cases: - Work distribution to processing workers - Load balancing analytics tasks - Queue-based processing workflows

Request (REQ)

Request pattern for client-server communication. Each message expects a reply from the server.

{
  "config": {
    "socket_type": "REQ",
    "request_timeout": 5000,
    "retries": 3
  }
}

Use cases: - Service API calls - Command-response communication - Synchronous data requests

URI Scheme

The ZMQ plugin registers the zmq:// URI scheme with the Output plugin system.

URI Format

zmq://transport://address:port

Supported Transports: - tcp: TCP transport for network communication - ipc: Inter-process communication (Unix domain sockets) - inproc: In-process communication (thread-to-thread)

URI Examples

zmq://tcp://*:5555                    # Bind to all interfaces, port 5555
zmq://tcp://localhost:5555            # Connect to localhost
zmq://tcp://analytics-server:5556     # Connect to remote server
zmq://ipc:///tmp/analytics.ipc        # IPC socket (Unix)
zmq://inproc://analytics              # In-process communication

API Reference

C++ API

The ZMQ plugin implements both the standalone iface::ZMQ interface and the iface::OutputHandler interface:

// Output Handler Interface (registered with Output plugin)
class ZMQOutputHandler : public iface::OutputHandler {
public:
    // Constructor for output handler
    ZMQOutputHandler(const std::string& moduleName,
                    const std::string& schema, 
                    const std::string& sink,
                    const std::string& uri, 
                    pCValue config);

    // Factory method (registered with Output plugin)
    static std::shared_ptr<iface::OutputHandler> create(
        const std::string& moduleName,
        const std::string& schema,
        const std::string& sink, 
        const std::string& path,
        pCValue config);

    // Output handler methods
    expected<bool> write(pCValue sinkData = VAL(), 
                        std::string dynamicPath = "") override;
    void stop() override;
    void close() override;
    std::string getSink() override;
};

// Standalone ZMQ Interface
class ZMQImpl : public iface::ZMQ {
public:
    // Connection methods
    bool connect();
    bool connect(const std::string& zmqServer);
    bool disconnect();
    bool isConnected() const;

    // Message methods
    expected<pCValue> readMessage();
    void writeMessage(const std::string& topic, pCValue msg);

    // Event handling
    void eventHandler(pCValue val);

    // Configuration methods
    bool setConfig(pCValue config);
    pCValue getConfig() const;
};

Lua API

The ZMQ plugin is accessible through both the Output plugin system and standalone factory:

-- Via Output plugin system (recommended for data export)
local output = api.factory.output.create(instance, "ZMQExporter")

-- Add ZMQ output handler
local zmqHandler = output:addHandler(
    "zmq-publisher",
    "zmq://tcp://*:5555",
    "events",
    {
        socket_type = "PUB",
        topic = "analytics.events",
        high_water_mark = 1000
    }
)

-- Standalone ZMQ client (for direct control)
local zmq = api.factory.zmq.create(instance, "ZMQClient")

-- Connection methods
zmq:connect()  -- Uses configuration
zmq:connect("zmq://tcp://localhost:5555")  -- Direct connection

-- Message methods
local message = zmq:readMessage()  -- Returns CValue
zmq:writeMessage("topic", message)  -- Send CValue

-- Configuration
local config = zmq:getConfig()
zmq:saveConfig(newConfig)

Full Lua API Reference →

Examples

Basic Event Publishing

-- Create output instance for ZMQ publishing
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "ZMQEventPublisher")

-- Configure ZMQ publisher for analytics events
local pubConfig = {
    socket_type = "PUB",
    topic = "analytics.events",
    high_water_mark = 1000,
    linger = 1000
}

local pubHandler = output:addHandler(
    "event-publisher",
    "zmq://tcp://*:5555",
    "events",
    pubConfig
)

if pubHandler then
    api.logging.LogInfo("ZMQ event publishing configured")
    api.logging.LogInfo("Socket: PUB on tcp://*:5555")
    api.logging.LogInfo("Topic: analytics.events")
    api.logging.LogInfo("High Water Mark: 1000")

    -- Events from the "events" sink will be automatically published
    -- Subscribers can connect to tcp://your-server:5555
else
    api.logging.LogError("Failed to create ZMQ publisher handler")
end

Work Distribution with PUSH Pattern

local output = api.factory.output.create(instance, "ZMQWorkDistributor")

-- Configure ZMQ PUSH socket for work distribution
local pushConfig = {
    socket_type = "PUSH",
    high_water_mark = 5000,
    send_timeout = 100,
    reconnect_interval = 5000
}

local pushHandler = output:addHandler(
    "work-distributor",
    "zmq://tcp://work-queue:5556",
    "processing_tasks",
    pushConfig
)

if pushHandler then
    api.logging.LogInfo("ZMQ work distribution configured")
    api.logging.LogInfo("Socket: PUSH to tcp://work-queue:5556")
    api.logging.LogInfo("Processing tasks will be distributed to workers")

    -- Workers should use PULL sockets to receive work
    -- Load balancing is handled automatically by ZMQ
else
    api.logging.LogError("Failed to create ZMQ work distributor")
end

Multi-Pattern Data Streaming

local output = api.factory.output.create(instance, "ZMQMultiStream")

-- Configure multiple ZMQ patterns for different data types
local zmqConfigs = {
    {
        name = "event-broadcast",
        uri = "zmq://tcp://*:5555",
        sink = "events",
        config = {
            socket_type = "PUB",
            topic = "analytics.events",
            high_water_mark = 1000
        }
    },
    {
        name = "work-distribution",
        uri = "zmq://tcp://work-queue:5556",
        sink = "processing_tasks",
        config = {
            socket_type = "PUSH",
            high_water_mark = 5000,
            send_timeout = 50
        }
    },
    {
        name = "diagnostics-feed",
        uri = "zmq://tcp://*:5557",
        sink = "diagnostics",
        config = {
            socket_type = "PUB", 
            topic = "system.diagnostics",
            high_water_mark = 500
        }
    }
}

-- Create all ZMQ handlers
local activeHandlers = {}
for _, config in ipairs(zmqConfigs) do
    local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
    if handler then
        table.insert(activeHandlers, config.name)
        api.logging.LogInfo("Created ZMQ handler: " .. config.name)
        api.logging.LogInfo("  Pattern: " .. config.config.socket_type)
        api.logging.LogInfo("  URI: " .. config.uri)
        if config.config.topic then
            api.logging.LogInfo("  Topic: " .. config.config.topic)
        end
    else
        api.logging.LogError("Failed to create handler: " .. config.name)
    end
end

api.logging.LogInfo("Active ZMQ handlers: " .. #activeHandlers)

IPC Communication

local output = api.factory.output.create(instance, "ZMQLocalIPC")

-- Configure IPC communication for local processes
local ipcConfig = {
    socket_type = "PUB",
    topic = "local.analytics",
    high_water_mark = 2000,
    linger = 500
}

local ipcHandler = output:addHandler(
    "local-ipc",
    "zmq://ipc:///tmp/cvedia-analytics.ipc",
    "events",
    ipcConfig
)

if ipcHandler then
    api.logging.LogInfo("ZMQ IPC communication configured")
    api.logging.LogInfo("Socket: /tmp/cvedia-analytics.ipc")
    api.logging.LogInfo("Local processes can subscribe using:")
    api.logging.LogInfo("  zmq://ipc:///tmp/cvedia-analytics.ipc")

    -- Other processes on the same machine can connect via IPC
    -- More efficient than TCP for local communication
else
    api.logging.LogError("Failed to create ZMQ IPC handler")
end

Request-Response Service Integration

local output = api.factory.output.create(instance, "ZMQServiceClient")

-- Configure REQ socket for service communication
local reqConfig = {
    socket_type = "REQ",
    request_timeout = 5000,
    retries = 3,
    reconnect_interval = 2000
}

local reqHandler = output:addHandler(
    "service-client",
    "zmq://tcp://api-service:5558",
    "service_requests",
    reqConfig
)

if reqHandler then
    api.logging.LogInfo("ZMQ service client configured")
    api.logging.LogInfo("Socket: REQ to tcp://api-service:5558")
    api.logging.LogInfo("Request timeout: 5000ms")
    api.logging.LogInfo("Max retries: 3")

    -- Service requests from "service_requests" sink
    -- will be sent to the service and responses handled
else
    api.logging.LogError("Failed to create ZMQ service client")
end

Standalone ZMQ Usage

-- For direct ZMQ control (not through Output plugin)
local zmq = api.factory.zmq.create(instance, "DirectZMQ")

-- Connect to ZMQ endpoint
local success = zmq:connect("zmq://tcp://localhost:5555")
if not success then
    api.logging.LogError("Failed to connect to ZMQ endpoint")
    return
end

-- Create custom message
local customMessage = {
    timestamp = os.time(),
    source = "cvedia-rt",
    data = "Custom analytics data",
    metadata = {
        camera_id = "cam-001",
        event_type = "custom_event"
    }
}

-- Send message (will be JSON serialized)
zmq:writeMessage("custom.topic", customMessage)

-- For subscriber pattern, you would typically read messages
local receivedMessage = zmq:readMessage()
if receivedMessage then
    api.logging.LogInfo("Received: " .. receivedMessage:str())
end

Performance Considerations

Message Throughput

  1. High Water Mark: Set appropriate limits to prevent memory buildup
  2. Use higher values for burst scenarios
  3. Monitor memory usage with high throughput
  4. Consider disk swapping for very large queues

  5. Send Timeout: Balance responsiveness vs reliability

  6. Shorter timeouts for real-time applications
  7. Longer timeouts for guaranteed delivery
  8. Monitor timeout events and adjust accordingly

  9. Socket Types: Choose optimal pattern for your use case

  10. PUB/SUB for broadcast scenarios
  11. PUSH/PULL for work distribution
  12. REQ/REP for request-response patterns

Network Optimization

  1. Transport Selection:
  2. TCP for network communication
  3. IPC for local processes (higher performance)
  4. InProc for thread-to-thread communication

  5. Connection Management:

  6. Enable reconnection for production deployments
  7. Use exponential backoff for retry intervals
  8. Monitor connection status and log events

  9. Message Size:

  10. Keep messages under 1MB for optimal performance
  11. Use compression for large payloads
  12. Consider message batching for small, frequent messages

Resource Management

  1. Memory Usage:
  2. Monitor queue sizes and memory consumption
  3. Set appropriate high water marks
  4. Implement message flow control if needed

  5. CPU Usage:

  6. ZMQ is generally CPU-efficient
  7. Monitor JSON serialization overhead
  8. Consider message format optimization

  9. File Descriptors:

  10. Each ZMQ socket uses file descriptors
  11. Monitor system limits for high-connection scenarios
  12. Clean up unused connections properly

Troubleshooting

Connection Issues

  1. "Address already in use"
  2. Check if another process is using the port
  3. Use netstat -an | grep :5555 to verify port usage
  4. Consider using different ports or SO_REUSEADDR
  5. Ensure proper socket cleanup on application restart

  6. "Connection refused"

  7. Verify ZMQ endpoint is listening: telnet host port
  8. Check network connectivity and firewall rules
  9. Validate URI format and transport type
  10. Ensure ZMQ server is started before client connection

  11. "Context terminated"

  12. ZMQ context was shut down while socket was active
  13. Ensure proper shutdown sequence (sockets before context)
  14. Check for application termination issues
  15. Monitor context lifecycle in multi-threaded scenarios

Message Delivery Issues

  1. Messages not being sent
  2. Check Output plugin handler registration and status
  3. Verify sink data is available and in correct format
  4. Monitor ZMQ socket state and connection status
  5. Check high water mark limits and queue status

  6. High message latency

  7. Reduce send timeout for faster failure detection
  8. Monitor network latency to ZMQ endpoints
  9. Check high water mark settings and queue buildup
  10. Consider using IPC for local communication

  11. Message loss

  12. PUB sockets drop messages if no subscribers connected
  13. PUSH sockets may drop messages if high water mark reached
  14. Monitor high water mark events and adjust limits
  15. Implement application-level acknowledgments if needed

Output Handler Issues

  1. Handler creation fails
  2. Check ZMQ library installation and version compatibility
  3. Verify URI format and transport availability
  4. Validate configuration parameters and socket types
  5. Monitor system resources and file descriptor limits

  6. Data not flowing through handler

  7. Verify sink name matches data source exactly
  8. Check if handler is enabled in configuration
  9. Monitor Output plugin event processing pipeline
  10. Validate data format and JSON serialization

  11. Performance degradation

  12. Monitor message queue sizes and memory usage
  13. Check network bandwidth and latency metrics
  14. Implement message batching if appropriate
  15. Consider horizontal scaling for high throughput

Debugging Tools

  1. ZMQ Monitoring:

    # Monitor ZMQ socket events (if compiled with monitoring)
    zmq_socket_monitor(socket, "inproc://monitor", ZMQ_EVENT_ALL);
    
    # Check ZMQ version
    zmq_version(&major, &minor, &patch);
    

  2. Network Testing:

    # Test TCP connectivity
    telnet zmq-server 5555
    
    # Monitor network traffic
    tcpdump -i any -n port 5555
    
    # Check socket statistics
    ss -tuln | grep :5555
    

  3. Message Testing:

    # Python ZMQ test subscriber
    import zmq
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE, b"analytics")
    
    while True:
        message = socket.recv_string()
        print(f"Received: {message}")
    

  4. Log Analysis:

  5. Enable ZMQ plugin debug logging
  6. Monitor Output plugin handler status
  7. Check system resource usage
  8. Analyze message delivery statistics

Integration Examples

Distributed Analytics Pipeline

{
  "output": {
    "handlers": {
      "raw-events": {
        "uri": "zmq://tcp://event-processor:5555",
        "sink": "events",
        "enabled": true,
        "config": {
          "socket_type": "PUSH",
          "high_water_mark": 10000,
          "send_timeout": 50
        }
      },
      "processed-alerts": {
        "uri": "zmq://tcp://*:5556", 
        "sink": "alerts",
        "enabled": true,
        "config": {
          "socket_type": "PUB",
          "topic": "alerts.security",
          "high_water_mark": 1000
        }
      }
    }
  }
}

Edge-to-Cloud Data Streaming

{
  "output": {
    "handlers": {
      "cloud-uplink": {
        "uri": "zmq://tcp://cloud-gateway:5555",
        "sink": "events",
        "enabled": true,
        "config": {
          "socket_type": "PUSH",
          "high_water_mark": 5000,
          "send_timeout": 1000,
          "reconnect_interval": 10000,
          "max_reconnect_attempts": -1
        }
      }
    }
  }
}

Multi-Consumer Broadcasting

{
  "output": {
    "handlers": {
      "analytics-broadcast": {
        "uri": "zmq://tcp://*:5555",
        "sink": "events", 
        "enabled": true,
        "config": {
          "socket_type": "PUB",
          "topic": "analytics",
          "high_water_mark": 2000,
          "linger": 5000
        }
      },
      "diagnostics-broadcast": {
        "uri": "zmq://tcp://*:5556",
        "sink": "diagnostics",
        "enabled": true,
        "config": {
          "socket_type": "PUB",
          "topic": "system",
          "high_water_mark": 500
        }
      }
    }
  }
}

Local Process Communication

# Start ZMQ subscriber for local testing
python3 -c "
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('ipc:///tmp/cvedia-analytics.ipc')
socket.setsockopt(zmq.SUBSCRIBE, b'analytics')

while True:
    topic, message = socket.recv_multipart()
    print(f'Topic: {topic.decode()}, Message: {message.decode()}')
"

See Also