ZMQ Plugin¶
Description¶
The ZMQ plugin provides high-performance messaging integration using the ZeroMQ protocol through the Output plugin system. It registers the zmq://
URI scheme and serves as an output handler for streaming analytics data to ZeroMQ consumers. The plugin supports multiple ZeroMQ patterns and provides scalable, low-latency data distribution capabilities.
Key Features¶
- Output Handler Integration: Registered
zmq://
URI scheme with Output plugin - Multiple ZMQ Patterns: PUB, PUSH, REQ socket types for different messaging patterns
- High Performance: Asynchronous, non-blocking message publishing
- JSON Message Formatting: Structured data export with automatic serialization
- Event-driven Architecture: Real-time analytics streaming integration
- Connection Management: Automatic connection handling and error recovery
- Distributed Messaging: Scalable messaging for distributed systems
- Topic-based Routing: Flexible topic-based message distribution
- TCP/IPC/InProc Transport: Multiple transport protocols support
When to Use¶
- Real-time analytics data streaming to distributed systems
- High-frequency data publishing with low latency requirements
- Microservices event-driven architectures
- Integration with ZMQ-based message brokers and consumers
- Scalable data distribution for large deployments
- Inter-process communication (IPC) for local system integration
- Queue-based data processing workflows
- Edge-to-cloud data streaming applications
Requirements¶
Software Dependencies¶
- ZeroMQ library (libzmq) version 4.0 or higher
- Network connectivity for TCP transport
- IPC support for inter-process communication
- C++ runtime compatible with ZMQ bindings
Network Requirements¶
- TCP/IP connectivity for remote ZMQ endpoints
- Default ZMQ ports: User-configurable (commonly 5555, 5556, etc.)
- Firewall rules allowing outbound connections
- Low-latency network for real-time streaming applications
Configuration¶
The ZMQ plugin is configured through the Output plugin system using the zmq://
URI scheme.
Basic Configuration¶
{
"output": {
"handlers": {
"zmq-publisher": {
"uri": "zmq://tcp://*:5555",
"sink": "events",
"enabled": true,
"config": {
"socket_type": "PUB",
"topic": "analytics.events",
"high_water_mark": 1000
}
}
}
}
}
Advanced Configuration¶
{
"output": {
"handlers": {
"zmq-analytics": {
"uri": "zmq://tcp://analytics-server:5555",
"sink": "events",
"enabled": true,
"config": {
"socket_type": "PUSH",
"topic": "analytics.detections",
"high_water_mark": 5000,
"linger": 1000,
"send_timeout": 100,
"reconnect_interval": 5000,
"max_reconnect_attempts": 10,
"queue_size": 10000
}
}
}
}
}
Multi-Pattern Configuration¶
{
"output": {
"handlers": {
"zmq-publisher": {
"uri": "zmq://tcp://*:5555",
"sink": "events",
"enabled": true,
"config": {
"socket_type": "PUB",
"topic": "analytics.events",
"high_water_mark": 1000
}
},
"zmq-worker": {
"uri": "zmq://tcp://work-queue:5556",
"sink": "processing_tasks",
"enabled": true,
"config": {
"socket_type": "PUSH",
"high_water_mark": 5000,
"send_timeout": 50
}
},
"zmq-request": {
"uri": "zmq://tcp://service-endpoint:5557",
"sink": "service_requests",
"enabled": true,
"config": {
"socket_type": "REQ",
"request_timeout": 5000,
"retries": 3
}
}
}
}
}
Configuration Schema¶
Parameter | Type | Default | Description |
---|---|---|---|
uri |
string | required | ZMQ endpoint URI (zmq://transport://address:port) |
sink |
string | "events" | Data sink to connect to |
enabled |
boolean | true | Enable/disable ZMQ output |
config.socket_type |
string | "PUSH" | ZMQ socket type (PUB, PUSH, REQ) |
config.topic |
string | "" | Topic prefix for PUB sockets |
config.high_water_mark |
integer | 1000 | Maximum queued messages |
config.linger |
integer | 1000 | Socket linger time in milliseconds |
config.send_timeout |
integer | 100 | Send operation timeout in milliseconds |
config.reconnect_interval |
integer | 5000 | Reconnection interval in milliseconds |
config.max_reconnect_attempts |
integer | -1 | Maximum reconnection attempts (-1 = infinite) |
config.queue_size |
integer | 1000 | Internal message queue size |
config.request_timeout |
integer | 5000 | Request timeout for REQ sockets (milliseconds) |
config.retries |
integer | 3 | Number of retries for REQ sockets |
Supported ZMQ Patterns¶
Publisher (PUB)¶
Publisher pattern for one-to-many data distribution. Subscribers connect and receive messages matching their subscriptions.
{
"config": {
"socket_type": "PUB",
"topic": "analytics.detections",
"high_water_mark": 1000
}
}
Use cases: - Broadcasting analytics events to multiple consumers - Real-time data feeds for monitoring systems - Event notification distribution
Push¶
Push pattern for distributing work to multiple workers. Messages are load-balanced across connected workers.
{
"config": {
"socket_type": "PUSH",
"high_water_mark": 5000,
"send_timeout": 100
}
}
Use cases: - Work distribution to processing workers - Load balancing analytics tasks - Queue-based processing workflows
Request (REQ)¶
Request pattern for client-server communication. Each message expects a reply from the server.
{
"config": {
"socket_type": "REQ",
"request_timeout": 5000,
"retries": 3
}
}
Use cases: - Service API calls - Command-response communication - Synchronous data requests
URI Scheme¶
The ZMQ plugin registers the zmq://
URI scheme with the Output plugin system.
URI Format¶
zmq://transport://address:port
Supported Transports: - tcp: TCP transport for network communication - ipc: Inter-process communication (Unix domain sockets) - inproc: In-process communication (thread-to-thread)
URI Examples¶
zmq://tcp://*:5555 # Bind to all interfaces, port 5555
zmq://tcp://localhost:5555 # Connect to localhost
zmq://tcp://analytics-server:5556 # Connect to remote server
zmq://ipc:///tmp/analytics.ipc # IPC socket (Unix)
zmq://inproc://analytics # In-process communication
API Reference¶
C++ API¶
The ZMQ plugin implements both the standalone iface::ZMQ
interface and the iface::OutputHandler
interface:
// Output Handler Interface (registered with Output plugin)
class ZMQOutputHandler : public iface::OutputHandler {
public:
// Constructor for output handler
ZMQOutputHandler(const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& uri,
pCValue config);
// Factory method (registered with Output plugin)
static std::shared_ptr<iface::OutputHandler> create(
const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
pCValue config);
// Output handler methods
expected<bool> write(pCValue sinkData = VAL(),
std::string dynamicPath = "") override;
void stop() override;
void close() override;
std::string getSink() override;
};
// Standalone ZMQ Interface
class ZMQImpl : public iface::ZMQ {
public:
// Connection methods
bool connect();
bool connect(const std::string& zmqServer);
bool disconnect();
bool isConnected() const;
// Message methods
expected<pCValue> readMessage();
void writeMessage(const std::string& topic, pCValue msg);
// Event handling
void eventHandler(pCValue val);
// Configuration methods
bool setConfig(pCValue config);
pCValue getConfig() const;
};
Lua API¶
The ZMQ plugin is accessible through both the Output plugin system and standalone factory:
-- Via Output plugin system (recommended for data export)
local output = api.factory.output.create(instance, "ZMQExporter")
-- Add ZMQ output handler
local zmqHandler = output:addHandler(
"zmq-publisher",
"zmq://tcp://*:5555",
"events",
{
socket_type = "PUB",
topic = "analytics.events",
high_water_mark = 1000
}
)
-- Standalone ZMQ client (for direct control)
local zmq = api.factory.zmq.create(instance, "ZMQClient")
-- Connection methods
zmq:connect() -- Uses configuration
zmq:connect("zmq://tcp://localhost:5555") -- Direct connection
-- Message methods
local message = zmq:readMessage() -- Returns CValue
zmq:writeMessage("topic", message) -- Send CValue
-- Configuration
local config = zmq:getConfig()
zmq:saveConfig(newConfig)
Examples¶
Basic Event Publishing¶
-- Create output instance for ZMQ publishing
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "ZMQEventPublisher")
-- Configure ZMQ publisher for analytics events
local pubConfig = {
socket_type = "PUB",
topic = "analytics.events",
high_water_mark = 1000,
linger = 1000
}
local pubHandler = output:addHandler(
"event-publisher",
"zmq://tcp://*:5555",
"events",
pubConfig
)
if pubHandler then
api.logging.LogInfo("ZMQ event publishing configured")
api.logging.LogInfo("Socket: PUB on tcp://*:5555")
api.logging.LogInfo("Topic: analytics.events")
api.logging.LogInfo("High Water Mark: 1000")
-- Events from the "events" sink will be automatically published
-- Subscribers can connect to tcp://your-server:5555
else
api.logging.LogError("Failed to create ZMQ publisher handler")
end
Work Distribution with PUSH Pattern¶
local output = api.factory.output.create(instance, "ZMQWorkDistributor")
-- Configure ZMQ PUSH socket for work distribution
local pushConfig = {
socket_type = "PUSH",
high_water_mark = 5000,
send_timeout = 100,
reconnect_interval = 5000
}
local pushHandler = output:addHandler(
"work-distributor",
"zmq://tcp://work-queue:5556",
"processing_tasks",
pushConfig
)
if pushHandler then
api.logging.LogInfo("ZMQ work distribution configured")
api.logging.LogInfo("Socket: PUSH to tcp://work-queue:5556")
api.logging.LogInfo("Processing tasks will be distributed to workers")
-- Workers should use PULL sockets to receive work
-- Load balancing is handled automatically by ZMQ
else
api.logging.LogError("Failed to create ZMQ work distributor")
end
Multi-Pattern Data Streaming¶
local output = api.factory.output.create(instance, "ZMQMultiStream")
-- Configure multiple ZMQ patterns for different data types
local zmqConfigs = {
{
name = "event-broadcast",
uri = "zmq://tcp://*:5555",
sink = "events",
config = {
socket_type = "PUB",
topic = "analytics.events",
high_water_mark = 1000
}
},
{
name = "work-distribution",
uri = "zmq://tcp://work-queue:5556",
sink = "processing_tasks",
config = {
socket_type = "PUSH",
high_water_mark = 5000,
send_timeout = 50
}
},
{
name = "diagnostics-feed",
uri = "zmq://tcp://*:5557",
sink = "diagnostics",
config = {
socket_type = "PUB",
topic = "system.diagnostics",
high_water_mark = 500
}
}
}
-- Create all ZMQ handlers
local activeHandlers = {}
for _, config in ipairs(zmqConfigs) do
local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
if handler then
table.insert(activeHandlers, config.name)
api.logging.LogInfo("Created ZMQ handler: " .. config.name)
api.logging.LogInfo(" Pattern: " .. config.config.socket_type)
api.logging.LogInfo(" URI: " .. config.uri)
if config.config.topic then
api.logging.LogInfo(" Topic: " .. config.config.topic)
end
else
api.logging.LogError("Failed to create handler: " .. config.name)
end
end
api.logging.LogInfo("Active ZMQ handlers: " .. #activeHandlers)
IPC Communication¶
local output = api.factory.output.create(instance, "ZMQLocalIPC")
-- Configure IPC communication for local processes
local ipcConfig = {
socket_type = "PUB",
topic = "local.analytics",
high_water_mark = 2000,
linger = 500
}
local ipcHandler = output:addHandler(
"local-ipc",
"zmq://ipc:///tmp/cvedia-analytics.ipc",
"events",
ipcConfig
)
if ipcHandler then
api.logging.LogInfo("ZMQ IPC communication configured")
api.logging.LogInfo("Socket: /tmp/cvedia-analytics.ipc")
api.logging.LogInfo("Local processes can subscribe using:")
api.logging.LogInfo(" zmq://ipc:///tmp/cvedia-analytics.ipc")
-- Other processes on the same machine can connect via IPC
-- More efficient than TCP for local communication
else
api.logging.LogError("Failed to create ZMQ IPC handler")
end
Request-Response Service Integration¶
local output = api.factory.output.create(instance, "ZMQServiceClient")
-- Configure REQ socket for service communication
local reqConfig = {
socket_type = "REQ",
request_timeout = 5000,
retries = 3,
reconnect_interval = 2000
}
local reqHandler = output:addHandler(
"service-client",
"zmq://tcp://api-service:5558",
"service_requests",
reqConfig
)
if reqHandler then
api.logging.LogInfo("ZMQ service client configured")
api.logging.LogInfo("Socket: REQ to tcp://api-service:5558")
api.logging.LogInfo("Request timeout: 5000ms")
api.logging.LogInfo("Max retries: 3")
-- Service requests from "service_requests" sink
-- will be sent to the service and responses handled
else
api.logging.LogError("Failed to create ZMQ service client")
end
Standalone ZMQ Usage¶
-- For direct ZMQ control (not through Output plugin)
local zmq = api.factory.zmq.create(instance, "DirectZMQ")
-- Connect to ZMQ endpoint
local success = zmq:connect("zmq://tcp://localhost:5555")
if not success then
api.logging.LogError("Failed to connect to ZMQ endpoint")
return
end
-- Create custom message
local customMessage = {
timestamp = os.time(),
source = "cvedia-rt",
data = "Custom analytics data",
metadata = {
camera_id = "cam-001",
event_type = "custom_event"
}
}
-- Send message (will be JSON serialized)
zmq:writeMessage("custom.topic", customMessage)
-- For subscriber pattern, you would typically read messages
local receivedMessage = zmq:readMessage()
if receivedMessage then
api.logging.LogInfo("Received: " .. receivedMessage:str())
end
Performance Considerations¶
Message Throughput¶
- High Water Mark: Set appropriate limits to prevent memory buildup
- Use higher values for burst scenarios
- Monitor memory usage with high throughput
-
Consider disk swapping for very large queues
-
Send Timeout: Balance responsiveness vs reliability
- Shorter timeouts for real-time applications
- Longer timeouts for guaranteed delivery
-
Monitor timeout events and adjust accordingly
-
Socket Types: Choose optimal pattern for your use case
- PUB/SUB for broadcast scenarios
- PUSH/PULL for work distribution
- REQ/REP for request-response patterns
Network Optimization¶
- Transport Selection:
- TCP for network communication
- IPC for local processes (higher performance)
-
InProc for thread-to-thread communication
-
Connection Management:
- Enable reconnection for production deployments
- Use exponential backoff for retry intervals
-
Monitor connection status and log events
-
Message Size:
- Keep messages under 1MB for optimal performance
- Use compression for large payloads
- Consider message batching for small, frequent messages
Resource Management¶
- Memory Usage:
- Monitor queue sizes and memory consumption
- Set appropriate high water marks
-
Implement message flow control if needed
-
CPU Usage:
- ZMQ is generally CPU-efficient
- Monitor JSON serialization overhead
-
Consider message format optimization
-
File Descriptors:
- Each ZMQ socket uses file descriptors
- Monitor system limits for high-connection scenarios
- Clean up unused connections properly
Troubleshooting¶
Connection Issues¶
- "Address already in use"
- Check if another process is using the port
- Use
netstat -an | grep :5555
to verify port usage - Consider using different ports or SO_REUSEADDR
-
Ensure proper socket cleanup on application restart
-
"Connection refused"
- Verify ZMQ endpoint is listening:
telnet host port
- Check network connectivity and firewall rules
- Validate URI format and transport type
-
Ensure ZMQ server is started before client connection
-
"Context terminated"
- ZMQ context was shut down while socket was active
- Ensure proper shutdown sequence (sockets before context)
- Check for application termination issues
- Monitor context lifecycle in multi-threaded scenarios
Message Delivery Issues¶
- Messages not being sent
- Check Output plugin handler registration and status
- Verify sink data is available and in correct format
- Monitor ZMQ socket state and connection status
-
Check high water mark limits and queue status
-
High message latency
- Reduce send timeout for faster failure detection
- Monitor network latency to ZMQ endpoints
- Check high water mark settings and queue buildup
-
Consider using IPC for local communication
-
Message loss
- PUB sockets drop messages if no subscribers connected
- PUSH sockets may drop messages if high water mark reached
- Monitor high water mark events and adjust limits
- Implement application-level acknowledgments if needed
Output Handler Issues¶
- Handler creation fails
- Check ZMQ library installation and version compatibility
- Verify URI format and transport availability
- Validate configuration parameters and socket types
-
Monitor system resources and file descriptor limits
-
Data not flowing through handler
- Verify sink name matches data source exactly
- Check if handler is enabled in configuration
- Monitor Output plugin event processing pipeline
-
Validate data format and JSON serialization
-
Performance degradation
- Monitor message queue sizes and memory usage
- Check network bandwidth and latency metrics
- Implement message batching if appropriate
- Consider horizontal scaling for high throughput
Debugging Tools¶
-
ZMQ Monitoring:
# Monitor ZMQ socket events (if compiled with monitoring) zmq_socket_monitor(socket, "inproc://monitor", ZMQ_EVENT_ALL); # Check ZMQ version zmq_version(&major, &minor, &patch);
-
Network Testing:
# Test TCP connectivity telnet zmq-server 5555 # Monitor network traffic tcpdump -i any -n port 5555 # Check socket statistics ss -tuln | grep :5555
-
Message Testing:
# Python ZMQ test subscriber import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5555") socket.setsockopt(zmq.SUBSCRIBE, b"analytics") while True: message = socket.recv_string() print(f"Received: {message}")
-
Log Analysis:
- Enable ZMQ plugin debug logging
- Monitor Output plugin handler status
- Check system resource usage
- Analyze message delivery statistics
Integration Examples¶
Distributed Analytics Pipeline¶
{
"output": {
"handlers": {
"raw-events": {
"uri": "zmq://tcp://event-processor:5555",
"sink": "events",
"enabled": true,
"config": {
"socket_type": "PUSH",
"high_water_mark": 10000,
"send_timeout": 50
}
},
"processed-alerts": {
"uri": "zmq://tcp://*:5556",
"sink": "alerts",
"enabled": true,
"config": {
"socket_type": "PUB",
"topic": "alerts.security",
"high_water_mark": 1000
}
}
}
}
}
Edge-to-Cloud Data Streaming¶
{
"output": {
"handlers": {
"cloud-uplink": {
"uri": "zmq://tcp://cloud-gateway:5555",
"sink": "events",
"enabled": true,
"config": {
"socket_type": "PUSH",
"high_water_mark": 5000,
"send_timeout": 1000,
"reconnect_interval": 10000,
"max_reconnect_attempts": -1
}
}
}
}
}
Multi-Consumer Broadcasting¶
{
"output": {
"handlers": {
"analytics-broadcast": {
"uri": "zmq://tcp://*:5555",
"sink": "events",
"enabled": true,
"config": {
"socket_type": "PUB",
"topic": "analytics",
"high_water_mark": 2000,
"linger": 5000
}
},
"diagnostics-broadcast": {
"uri": "zmq://tcp://*:5556",
"sink": "diagnostics",
"enabled": true,
"config": {
"socket_type": "PUB",
"topic": "system",
"high_water_mark": 500
}
}
}
}
}
Local Process Communication¶
# Start ZMQ subscriber for local testing
python3 -c "
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('ipc:///tmp/cvedia-analytics.ipc')
socket.setsockopt(zmq.SUBSCRIBE, b'analytics')
while True:
topic, message = socket.recv_multipart()
print(f'Topic: {topic.decode()}, Message: {message.decode()}')
"
See Also¶
- Output Plugins Overview
- Output Plugin - Main output coordination system
- WriteData Plugin - File-based output
- MQTT Plugin - MQTT messaging protocol
- REST Plugin - HTTP API integration
- NBus Plugin - Network bus communication
- TopBus Plugin - High-performance message bus
- Lua Scripting Reference - Full API documentation
- ZeroMQ Documentation - Official ZMQ documentation