ZMQ Input Plugin¶
Description¶
The ZMQ Input plugin provides high-performance data ingestion capabilities through ZeroMQ (ØMQ) messaging for CVEDIA-RT. It enables the system to receive real-time data streams, subscribe to distributed message publishers, and process incoming data from external ZeroMQ-enabled applications and sensors.
Key input capabilities include:
- ZeroMQ Integration: Native ZeroMQ messaging library integration for data input
- Multiple Input Patterns: Support for SUB (subscriber), PULL (pipeline), and REP (reply) socket patterns
- Real-time Data Streaming: Low-latency data ingestion from external sources
- Topic-based Filtering: Subscribe to specific data topics and message types
- Distributed Input: Receive data from multiple distributed publishers
- Asynchronous Processing: Non-blocking message reception and processing
- Format Flexibility: JSON-based message format with automatic deserialization
- Connection Management: Robust connection handling and reconnection logic
Requirements¶
Software Dependencies¶
- ZeroMQ Library (libzmq):
- ZeroMQ 4.3.x or later (recommended)
- ZMQ C++ bindings (zmq.hpp)
- CVEDIA-RT Core - Platform core functionality
- Network Libraries - TCP/UDP networking support
Platform Requirements¶
- Windows: ZeroMQ Windows packages or vcpkg installation
- Linux: ZeroMQ development packages (
libzmq3-dev
,libzmq5-dev
) - Network Configuration: Accessible network ports for ZMQ communication
Hardware Requirements¶
- Sufficient network bandwidth for expected data throughput
- Memory capacity for message buffering and processing
- CPU resources for message deserialization and processing
Configuration¶
Basic Input Configuration¶
{
"zmq_server": "tcp://*:5555",
"socket_type": "SUB",
"topics": ["sensor_data", "camera_feed"],
"buffer_size": 1000
}
Advanced Input Configuration¶
{
"zmq_server": "tcp://192.168.1.100:5555",
"socket_type": "PULL",
"topics": ["alerts", "detections", "analytics"],
"buffer_size": 5000,
"timeout_ms": 1000,
"reconnect_interval": 5000,
"high_water_mark": 10000,
"linger_time": 1000
}
Multi-Source Input Configuration¶
{
"inputs": [
{
"zmq_server": "tcp://sensor1.local:5555",
"socket_type": "SUB",
"topics": ["temperature", "humidity"]
},
{
"zmq_server": "tcp://camera-server:5556",
"socket_type": "PULL",
"topics": ["frame_data"]
}
],
"buffer_size": 2000,
"enable_filtering": true
}
Configuration Parameters¶
Parameter | Type | Default | Description |
---|---|---|---|
zmq_server |
string | "tcp://*:5555" |
ZMQ server endpoint for input connections |
socket_type |
string | "SUB" |
ZMQ socket pattern (SUB, PULL, REP) |
topics |
array | [] |
List of topics to subscribe to (SUB sockets only) |
buffer_size |
integer | 1000 |
Input message buffer size |
timeout_ms |
integer | 1000 |
Receive timeout in milliseconds |
reconnect_interval |
integer | 5000 |
Reconnection interval in milliseconds |
high_water_mark |
integer | 1000 |
Maximum queued messages per socket |
linger_time |
integer | 0 |
Socket linger time on close |
enable_filtering |
boolean | false |
Enable topic-based message filtering |
Supported ZMQ Input Patterns¶
SUB (Subscriber) Pattern¶
Use Case: Subscribe to data from one or more publishers
- Socket Type: ZMQ_SUB
- Connection: Connects to publisher endpoints
- Topics: Supports topic-based filtering
- Scalability: Many subscribers can connect to one publisher
{
"socket_type": "SUB",
"zmq_server": "tcp://publisher.example.com:5555",
"topics": ["sensor_data", "alerts"]
}
PULL (Pipeline) Pattern¶
Use Case: Receive work items from pushers in a load-balanced manner
- Socket Type: ZMQ_PULL
- Connection: Binds to receive from multiple pushers
- Distribution: Round-robin distribution among pullers
- Use Case: Distributed task processing
{
"socket_type": "PULL",
"zmq_server": "tcp://*:5556"
}
REP (Reply) Pattern¶
Use Case: Receive requests and send replies (request-response pattern)
- Socket Type: ZMQ_REP
- Connection: Binds to receive requests from REQ sockets
- Protocol: Synchronous request-response messaging
- Use Case: API endpoint, service queries
{
"socket_type": "REP",
"zmq_server": "tcp://*:5557"
}
API Reference¶
C++ API - ZMQCore Class¶
Connection Management¶
bool connect(const std::string& zmqServer); // Connect to ZMQ endpoint
bool disconnect(); // Disconnect from endpoint
bool isConnected(); // Check connection status
Input Operations¶
nlohmann::json readMessage(); // Read incoming message (blocking)
expected<pCValue> readMessage(); // Read message as CValue object
void eventHandler(pCValue val); // Handle incoming events
Configuration¶
struct config {
std::string mqtt_server = ""; // ZMQ server endpoint
std::string client_id = ""; // Client identifier
std::string topic = ""; // Default topic filter
};
Lua API - ZMQ Input Interface¶
Connection Methods¶
-- Connect to ZMQ server for input
zmq:connect() -- Connect using configured server
zmq:connect(server_url) -- Connect to specific server
-- Check connection status
local connected = zmq:isConnected()
Message Reading¶
-- Read incoming messages
local message = zmq:readMessage() -- Read next message (blocking)
if message and message ~= "" then
-- Process received message
processIncomingData(message)
end
Configuration Management¶
-- Get current configuration
local config = zmq:getConfig()
-- Update configuration for input
zmq:saveConfig({
zmq_server = "tcp://data-source:5555",
socket_type = "SUB",
topics = {"sensor_data", "camera_feed"}
})
Examples¶
Basic Data Subscription¶
-- Create ZMQ input instance
local zmq_input = api.factory.zmq.create(instance, "data_subscriber")
-- Configure for subscribing to sensor data
zmq_input:saveConfig({
zmq_server = "tcp://sensor-hub:5555",
socket_type = "SUB",
topics = {"temperature", "pressure", "humidity"}
})
-- Connect and start receiving
zmq_input:connect()
-- Message processing loop
while true do
local message = zmq_input:readMessage()
if message and message ~= "" then
-- Parse and process sensor data
local data = json.decode(message)
processSensorReading(data)
end
end
Pipeline Worker Input¶
// Create ZMQ input for pipeline processing
ZMQCore inputWorker;
// Configure as pipeline worker (PULL socket)
inputWorker.pluginConf.mqtt_server = "tcp://*:5556";
inputWorker.connect("tcp://*:5556");
// Process incoming work items
while (inputWorker.isConnected()) {
auto workItem = inputWorker.readMessage();
if (!workItem.empty()) {
// Process work item
processWorkItem(workItem);
}
}
Multi-Topic Subscriber with Filtering¶
-- Configure multi-topic subscriber
local subscriber = api.factory.zmq.create(instance, "multi_subscriber")
subscriber:saveConfig({
zmq_server = "tcp://data-publisher:5555",
socket_type = "SUB",
topics = {"alerts", "detections", "analytics"},
enable_filtering = true,
buffer_size = 5000
})
subscriber:connect()
-- Message processing with topic filtering
function processMessage()
local message = subscriber:readMessage()
if message and message ~= "" then
local data = json.decode(message)
-- Route based on topic
if data.topic == "alerts" then
handleAlert(data.payload)
elseif data.topic == "detections" then
handleDetection(data.payload)
elseif data.topic == "analytics" then
handleAnalytics(data.payload)
end
end
end
Request-Response Input Service¶
// Create ZMQ service input (REP socket)
ZMQCore serviceInput;
serviceInput.connect("tcp://*:5557");
// Service request processing loop
while (serviceInput.isConnected()) {
// Receive request
auto request = serviceInput.readMessage();
if (!request.empty()) {
// Process request
auto response = processServiceRequest(request);
// Send reply (REP socket requirement)
serviceInput.writeMessage("", response);
}
}
Performance Considerations¶
Input Throughput Optimization¶
- Buffer Sizing: Configure appropriate buffer sizes based on expected message rates
- High Water Mark: Set HWM to prevent memory issues during traffic spikes
- Topic Filtering: Use specific topic subscriptions to reduce unnecessary data processing
- Asynchronous Processing: Process messages on separate threads to avoid blocking input
Memory Management¶
- Message Buffering: Monitor memory usage with high-volume message streams
- Buffer Limits: Implement buffer size limits to prevent out-of-memory conditions
- Garbage Collection: Regular cleanup of processed messages and connections
- Connection Pooling: Reuse connections when possible to reduce overhead
Network Optimization¶
- Connection Types: Choose appropriate transport (TCP, IPC, inproc) based on use case
- Compression: Consider message compression for large payloads
- Batching: Process messages in batches when possible for better throughput
- Keep-Alive: Configure appropriate heartbeat and keep-alive settings
Troubleshooting¶
Input Connection Issues¶
Cannot connect to ZMQ publisher - Verify the publisher is running and accessible - Check network connectivity and firewall settings - Ensure ZMQ server endpoint is correctly configured - Validate port availability and permissions
Messages not being received - Check topic subscription configuration (SUB sockets) - Verify publisher is sending to subscribed topics - Monitor network traffic and ZMQ socket status - Check buffer sizes and high water mark settings
Performance Issues¶
High message latency - Reduce buffer sizes for lower latency - Optimize message processing logic - Consider using faster transport methods (IPC vs TCP) - Monitor system resources and network bandwidth
Memory usage growing continuously - Implement proper message buffer limits - Check for message processing bottlenecks - Monitor garbage collection and cleanup processes - Adjust high water mark settings
Data Processing Issues¶
Message format errors - Validate JSON message structure and encoding - Check for message truncation or corruption - Verify data serialization/deserialization logic - Monitor message size limits and constraints
Missing or duplicate messages - Check ZMQ socket patterns and message delivery guarantees - Implement message acknowledgment where needed - Monitor network reliability and connection stability - Consider implementing message sequence numbers
Error Messages¶
- "ZMQ Connection failed": Check server availability and network connectivity
- "Socket bind failed": Verify port availability and permissions
- "Message buffer overflow": Increase buffer size or improve processing speed
- "Topic subscription failed": Check topic name format and publisher configuration