REST Plugin¶
Description¶
The REST plugin provides UDP-based data transmission capabilities through the Output plugin system. Despite its name suggesting HTTP REST functionality, this plugin specifically handles UDP protocol communication for real-time data streaming. It registers URI handlers for udp://
and udpimage://
schemes, enabling efficient network transmission of analytics data and images via UDP sockets.
Key Features¶
- UDP Data Transmission: High-performance UDP socket communication
- Image Streaming: Specialized UDP image transmission (
udpimage://
) - JSON Data Export: Structured data transmission via UDP (
udp://
) - Output Handler Integration: Registered with Output plugin system
- Configurable Endpoints: Flexible IP address and port configuration
- Real-time Streaming: Low-latency UDP transmission for live data
- Binary and Text Support: Handles both structured data and binary images
- Network Protocol Abstraction: Simplified UDP communication through URI scheme
When to Use¶
- Real-time data streaming to UDP-enabled applications
- Low-latency network transmission of analytics results
- Integration with UDP-based monitoring systems
- Streaming structured data (JSON) over UDP
- Transmitting images or video frames via UDP
- Network communication with custom UDP servers
- High-frequency data export where TCP overhead is undesirable
- Integration with legacy systems expecting UDP data
Requirements¶
Software Dependencies¶
- Network connectivity to target UDP endpoints
- UDP socket support (standard on all platforms)
- Optional: Image encoding libraries (for udpimage://)
Network Requirements¶
- UDP network connectivity to destination hosts
- Configurable UDP ports (default: 1234)
- Network bandwidth appropriate for data volume
- Firewall rules allowing UDP outbound connections
- Target systems capable of receiving UDP data
Configuration¶
The REST plugin is configured through the Output plugin system using udp://
and udpimage://
URI schemes.
Basic UDP Data Configuration¶
{
"output": {
"handlers": {
"udp-data": {
"uri": "udp://192.168.1.100:5000",
"sink": "events",
"enabled": true,
"config": {
"ip": "192.168.1.100",
"port": "5000"
}
}
}
}
}
UDP Image Transmission Configuration¶
{
"output": {
"handlers": {
"udp-images": {
"uri": "udpimage://192.168.1.200:6000",
"sink": "crops",
"enabled": true,
"config": {
"ip": "192.168.1.200",
"port": "6000"
}
}
}
}
}
Multiple UDP Endpoints¶
{
"output": {
"handlers": {
"primary-udp": {
"uri": "udp://primary-server.example.com:8080",
"sink": "events",
"enabled": true,
"config": {
"ip": "primary-server.example.com",
"port": "8080"
}
},
"backup-udp": {
"uri": "udp://backup-server.example.com:8080",
"sink": "events",
"enabled": true,
"config": {
"ip": "backup-server.example.com",
"port": "8080"
}
},
"image-stream": {
"uri": "udpimage://monitoring.example.com:9000",
"sink": "output",
"enabled": true,
"config": {
"ip": "monitoring.example.com",
"port": "9000"
}
}
}
}
}
Configuration Schema¶
Parameter | Type | Default | Description |
---|---|---|---|
uri |
string | required | UDP URI (udp:// or udpimage://) |
sink |
string | "output" | Data sink to connect to |
enabled |
boolean | true | Enable/disable UDP output |
config.ip |
string | "127.0.0.1" | Target IP address for UDP transmission |
config.port |
string | "1234" | Target port for UDP transmission |
URI Schemes¶
The REST plugin registers the following URI schemes with the Output plugin:
UDP Data Scheme (udp://
)¶
Format: udp://host:port
- Purpose: Transmit structured data (JSON) via UDP
- Data Format: String representation of sink data
- Use Case: Real-time analytics data streaming
Examples:
udp://localhost:1234 # Local UDP server
udp://192.168.1.100:5000 # Remote UDP endpoint
udp://analytics-server.com:8080 # Named host with custom port
UDP Image Scheme (udpimage://
)¶
Format: udpimage://host:port
- Purpose: Transmit image data via UDP
- Data Format: Binary image data from sink
- Use Case: Real-time image/video frame streaming
- Requirements: Sink data must contain "image" field with cbuffer
Examples:
udpimage://192.168.1.200:6000 # Image streaming endpoint
udpimage://video-monitor.local:7000 # Local video monitoring
udpimage://stream.example.com:9999 # Remote image streaming
API Reference¶
C++ API¶
The REST plugin implements the iface::OutputHandler
interface for UDP communication:
class RESTOutputHandler : public iface::OutputHandler {
public:
// Constructor
RESTOutputHandler(const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
const pCValue& config);
// Factory method (registered with Output plugin)
static std::shared_ptr<iface::OutputHandler> create(
const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
pCValue config);
// UDP transmission
expected<bool> write(pCValue sinkData = VAL(),
std::string dynamicPath = "") override;
void stop() override;
void close() override;
// Handler information
std::string getSink() override;
};
UDP Socket Implementation¶
// Internal UDP socket handling
class UDPsocket {
public:
bool open();
void close();
// Data transmission methods
int send(const std::string& data, const IPv4& address);
int send(const std::string& ip, const IPv4& address);
// Address handling
static IPv4 IPv4(const std::string& ip, uint16_t port);
};
Core REST Classes¶
class REST {
public:
struct config {
std::string ip = "127.0.0.1";
std::string port = "1234";
// Additional configuration parameters
};
// Configuration management
bool setConfig(pCValue config);
pCValue getConfig() const;
};
Lua API¶
The REST plugin is accessed through the Output plugin system:
-- UDP outputs are created through the Output plugin
local output = api.factory.output.create(instance, "UDPStreamer")
-- Add UDP data handler
local udpHandler = output:addHandler(
"udp-stream",
"udp://192.168.1.100:5000",
"events",
{
ip = "192.168.1.100",
port = "5000"
}
)
-- Add UDP image handler
local udpImageHandler = output:addHandler(
"udp-images",
"udpimage://192.168.1.200:6000",
"crops",
{
ip = "192.168.1.200",
port = "6000"
}
)
Note: The REST plugin operates as output handlers registered with UDP URI schemes through the Output plugin delegation system.
Examples¶
Basic UDP Data Streaming¶
-- Create output instance for UDP data streaming
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "UDPDataStreamer")
-- Configure UDP data transmission
local udpConfig = {
ip = "192.168.1.100",
port = "5000"
}
local udpHandler = output:addHandler(
"analytics-udp",
"udp://192.168.1.100:5000",
"events",
udpConfig
)
if udpHandler then
api.logging.LogInfo("UDP data streaming configured")
api.logging.LogInfo("Target: 192.168.1.100:5000")
api.logging.LogInfo("Data: JSON analytics events")
-- Events from the "events" sink will be automatically transmitted as JSON strings via UDP
else
api.logging.LogError("Failed to create UDP data handler")
end
UDP Image Streaming¶
local output = api.factory.output.create(instance, "UDPImageStreamer")
-- Configure UDP image transmission
local imageConfig = {
ip = "192.168.1.200",
port = "6000"
}
local imageHandler = output:addHandler(
"image-udp",
"udpimage://192.168.1.200:6000",
"output", -- Using "output" sink which contains image data
imageConfig
)
if imageHandler then
api.logging.LogInfo("UDP image streaming configured")
api.logging.LogInfo("Target: 192.168.1.200:6000")
api.logging.LogInfo("Data: Binary image frames")
-- Image data from the "output" sink will be transmitted via UDP
-- Requires sink data to contain "image" field with cbuffer
else
api.logging.LogError("Failed to create UDP image handler")
end
Multi-Destination UDP Streaming¶
local output = api.factory.output.create(instance, "MultiUDPStreamer")
-- Configure multiple UDP destinations
local destinations = {
{
name = "primary-analytics",
uri = "udp://analytics1.example.com:8080",
sink = "events",
config = {ip = "analytics1.example.com", port = "8080"}
},
{
name = "backup-analytics",
uri = "udp://analytics2.example.com:8080",
sink = "events",
config = {ip = "analytics2.example.com", port = "8080"}
},
{
name = "monitoring-images",
uri = "udpimage://monitor.example.com:9000",
sink = "crops",
config = {ip = "monitor.example.com", port = "9000"}
}
}
-- Create all UDP handlers
local activeHandlers = {}
for _, destination in ipairs(destinations) do
local handler = output:addHandler(
destination.name,
destination.uri,
destination.sink,
destination.config
)
if handler then
table.insert(activeHandlers, destination.name)
api.logging.LogInfo("Created UDP handler: " .. destination.name)
api.logging.LogInfo(" Target: " .. destination.config.ip .. ":" .. destination.config.port)
api.logging.LogInfo(" Type: " .. (destination.uri:match("udpimage://") and "Image" or "Data"))
else
api.logging.LogError("Failed to create handler: " .. destination.name)
end
end
api.logging.LogInfo("Active UDP handlers: " .. #activeHandlers)
Local Network Broadcasting¶
local output = api.factory.output.create(instance, "LocalUDPBroadcaster")
-- Configure local network UDP broadcasting
local localConfigs = {
{
name = "workstation-1",
ip = "192.168.1.101",
port = "5000",
sink = "events"
},
{
name = "workstation-2",
ip = "192.168.1.102",
port = "5000",
sink = "events"
},
{
name = "server-monitor",
ip = "192.168.1.10",
port = "8080",
sink = "diagnostics"
}
}
-- Create handlers for local network distribution
for _, config in ipairs(localConfigs) do
local handler = output:addHandler(
config.name,
"udp://" .. config.ip .. ":" .. config.port,
config.sink,
{
ip = config.ip,
port = config.port
}
)
if handler then
api.logging.LogInfo("Local UDP configured: " .. config.name)
api.logging.LogInfo(" Address: " .. config.ip .. ":" .. config.port)
api.logging.LogInfo(" Sink: " .. config.sink)
end
end
High-Frequency Data Streaming¶
local output = api.factory.output.create(instance, "HighFrequencyUDP")
-- Configure for high-frequency real-time data
local realtimeConfig = {
ip = "127.0.0.1", -- Local processing for minimal latency
port = "9999"
}
local realtimeHandler = output:addHandler(
"realtime-udp",
"udp://127.0.0.1:9999",
"raw-detections", -- High-frequency detection data
realtimeConfig
)
if realtimeHandler then
api.logging.LogInfo("High-frequency UDP streaming configured")
api.logging.LogInfo("Target: localhost:9999")
api.logging.LogInfo("Data: Raw detection events")
api.logging.LogInfo("Protocol: UDP (low latency)")
-- This configuration is optimal for:
-- - Real-time applications requiring minimal latency
-- - High-frequency data streams
-- - Local processing and analysis
-- - Applications where occasional packet loss is acceptable
end
Custom UDP Server Integration¶
local output = api.factory.output.create(instance, "CustomUDPIntegration")
-- Configure for integration with custom UDP server application
local customConfig = {
ip = "custom-server.internal.com",
port = "12345"
}
local customHandler = output:addHandler(
"custom-integration",
"udp://custom-server.internal.com:12345",
"events",
customConfig
)
if customHandler then
api.logging.LogInfo("Custom UDP server integration configured")
api.logging.LogInfo("Server: custom-server.internal.com:12345")
-- The UDP handler will send JSON-formatted event data
-- Example data format sent via UDP:
-- {
-- "timestamp": 1609459200,
-- "camera_id": "camera-001",
-- "events": [
-- {
-- "type": "intrusion",
-- "confidence": 0.95,
-- "bbox": {"x": 100, "y": 150, "width": 50, "height": 75}
-- }
-- ]
-- }
end
Error Handling and Monitoring¶
local output = api.factory.output.create(instance, "MonitoredUDPStreamer")
-- Configure UDP with monitoring
local monitoredConfigs = {
{
name = "primary-udp",
uri = "udp://primary.example.com:8080",
sink = "events"
},
{
name = "secondary-udp",
uri = "udp://secondary.example.com:8080",
sink = "events"
}
}
local successfulHandlers = {}
local failedHandlers = {}
-- Create handlers with error tracking
for _, config in ipairs(monitoredConfigs) do
local handler = output:addHandler(
config.name,
config.uri,
config.sink,
{
ip = config.uri:match("://([^:]+)"),
port = config.uri:match(":(%d+)$")
}
)
if handler then
table.insert(successfulHandlers, config.name)
api.logging.LogInfo("✓ UDP handler created: " .. config.name)
else
table.insert(failedHandlers, config.name)
api.logging.LogError("✗ UDP handler failed: " .. config.name)
end
end
-- Report status
api.logging.LogInfo("UDP Streaming Status:")
api.logging.LogInfo(" Successful: " .. #successfulHandlers .. "/" .. #monitoredConfigs)
api.logging.LogInfo(" Failed: " .. #failedHandlers .. "/" .. #monitoredConfigs)
if #failedHandlers > 0 then
api.logging.LogError(" Failed handlers: " .. table.concat(failedHandlers, ", "))
end
-- Function to check handler status (if available)
function checkUDPStatus()
for _, name in ipairs(successfulHandlers) do
local hasHandler = output:hasHandler(name)
if hasHandler then
api.logging.LogInfo("Handler " .. name .. ": Active")
else
api.logging.LogInfo("Handler " .. name .. ": Inactive")
end
end
end
Best Practices¶
Network Configuration¶
-
Target Selection:
- Use local IP addresses (127.0.0.1) for same-machine communication
- Use specific IP addresses for targeted transmission
- Avoid broadcast addresses unless specifically needed
- Consider network topology and routing
-
Port Management:
- Use ports above 1024 for non-privileged operation
- Avoid well-known service ports (80, 443, 22, etc.)
- Coordinate port usage across applications
- Document port assignments for maintenance
-
Data Volume Considerations:
- Monitor network bandwidth usage
- Consider UDP packet size limits (typically 64KB max)
- Implement data compression for large payloads
- Use appropriate sink selection for data volume
Performance Optimization¶
-
UDP Characteristics:
- UDP is connectionless and does not guarantee delivery
- No built-in error correction or retransmission
- Lower latency than TCP but potential for packet loss
- Best for real-time applications tolerating data loss
-
Data Format:
- JSON strings for
udp://
scheme (human-readable) - Binary image data for
udpimage://
scheme (efficient) - Keep data payloads within reasonable UDP limits
- Consider data serialization efficiency
- JSON strings for
-
Resource Management:
- UDP sockets are created and closed for each transmission
- No persistent connections maintained
- Minimal server-side resource usage
- Suitable for high-frequency, short-duration transmissions
Error Handling¶
-
Network Failures:
- UDP transmission failures are reported but not retried
- No automatic failover or retry mechanisms
- Implement application-level monitoring if needed
- Consider multiple destination handlers for redundancy
-
Data Validation:
- Receiving applications should validate UDP data
- No built-in data integrity checks
- Consider adding checksums or sequence numbers
- Implement application-level error detection
-
Troubleshooting:
- Use network monitoring tools (tcpdump, Wireshark)
- Check firewall rules for UDP traffic
- Verify target applications are listening on specified ports
- Monitor system logs for transmission errors
Troubleshooting¶
Connection Issues¶
-
"send(): failed (REQ)" error
- Cause: Network connectivity issues or target unreachable
- Solution:
# Test UDP connectivity nc -u target-host 5000 echo "test message" | nc -u target-host 5000 # Check routing traceroute target-host ping target-host
- Check: Firewall rules, network configuration, target availability
-
No data received at target
- Cause: Incorrect IP/port configuration or target not listening
- Solution:
# Test UDP port listening nc -l -u 5000 # Listen on UDP port 5000 # Check if port is in use netstat -un | grep :5000 ss -un | grep :5000
- Check: Target application configuration, port conflicts
-
Intermittent transmission failures
- Cause: Network congestion, packet loss, or target overload
- Solution: Monitor network utilization, implement rate limiting
- Check: Network performance, target processing capacity
Configuration Issues¶
-
Handler creation fails
- Cause: Invalid configuration parameters or network setup issues
- Solution: Validate IP addresses and port numbers
- Check:
-- Validate IP address format local ip = "192.168.1.100" if not ip:match("^%d+%.%d+%.%d+%.%d+$") then api.logging.LogError("Invalid IP address format") end -- Validate port range local port = tonumber("5000") if not port or port < 1 or port > 65535 then api.logging.LogError("Invalid port number") end
-
Incorrect data format
- Cause: Sink data not in expected format for UDP transmission
- Solution: Verify sink data structure and content
- Check: For
udpimage://
, ensure sink data contains "image" field
-
Performance degradation
- Cause: High-frequency transmissions overwhelming network or target
- Solution: Implement data throttling or selective transmission
- Check: Monitor CPU usage, network utilization, target processing
Network Troubleshooting¶
-
Packet Loss
- Detection:
# Monitor UDP statistics netstat -su | grep -i udp cat /proc/net/snmp | grep Udp
- Mitigation: Reduce transmission frequency, increase buffer sizes
- Monitoring: Implement application-level sequence numbers
- Detection:
-
Firewall Issues
- Windows:
# Check Windows firewall for UDP rules netsh advfirewall firewall show rule dir=out protocol=udp # Add UDP rule if needed netsh advfirewall firewall add rule name="CVEDIA UDP Out" dir=out action=allow protocol=udp localport=5000
- Linux:
# Check iptables for UDP rules iptables -L OUTPUT -v -n | grep udp # Add UDP rule if needed iptables -A OUTPUT -p udp --dport 5000 -j ACCEPT
- Windows:
-
Network Monitoring
- Real-time monitoring:
# Monitor UDP traffic with tcpdump tcpdump -i eth0 udp port 5000 # Monitor UDP packets with specific host tcpdump -i any udp and host 192.168.1.100 and port 5000
- Analysis: Use Wireshark for detailed packet analysis
- Real-time monitoring:
Debugging Tools¶
-
Network Testing:
# Send test UDP packet echo "test" | nc -u 192.168.1.100 5000 # Listen for UDP packets nc -l -u 5000 # Test with timeout timeout 10 nc -l -u 5000
-
Port Scanning:
# Check if UDP port is open nmap -sU -p 5000 192.168.1.100 # Scan range of UDP ports nmap -sU -p 5000-5010 192.168.1.100
-
System Monitoring:
# Monitor system UDP statistics watch -n 1 'cat /proc/net/snmp | grep Udp' # Monitor network interface statistics watch -n 1 'cat /proc/net/dev'
Integration Examples¶
Simple UDP Server (Python)¶
#!/usr/bin/env python3
import socket
import json
import threading
class UDPReceiver:
def __init__(self, host='0.0.0.0', port=5000):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((self.host, self.port))
self.running = False
def start(self):
self.running = True
print(f"UDP Server listening on {self.host}:{self.port}")
while self.running:
try:
data, addr = self.socket.recvfrom(65536)
self.process_data(data.decode('utf-8'), addr)
except socket.timeout:
continue
except Exception as e:
print(f"Error receiving data: {e}")
def process_data(self, data, address):
try:
# Try to parse as JSON
json_data = json.loads(data)
print(f"Received JSON from {address}: {json_data}")
# Process analytics data here
except json.JSONDecodeError:
# Handle as plain text
print(f"Received text from {address}: {data}")
def stop(self):
self.running = False
self.socket.close()
if __name__ == "__main__":
server = UDPReceiver('0.0.0.0', 5000)
try:
server.start()
except KeyboardInterrupt:
server.stop()
UDP Image Receiver (Python)¶
#!/usr/bin/env python3
import socket
import numpy as np
import cv2
class UDPImageReceiver:
def __init__(self, host='0.0.0.0', port=6000):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((self.host, self.port))
def start(self):
print(f"UDP Image Server listening on {self.host}:{self.port}")
while True:
try:
data, addr = self.socket.recvfrom(65536)
self.process_image(data, addr)
except Exception as e:
print(f"Error receiving image: {e}")
def process_image(self, data, address):
try:
# Convert bytes to numpy array
nparr = np.frombuffer(data, np.uint8)
# Decode image
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is not None:
print(f"Received image from {address}: {img.shape}")
# Process image here
cv2.imshow('UDP Image', img)
cv2.waitKey(1)
else:
print(f"Failed to decode image from {address}")
except Exception as e:
print(f"Error processing image: {e}")
if __name__ == "__main__":
receiver = UDPImageReceiver('0.0.0.0', 6000)
receiver.start()
Node.js UDP Server¶
const dgram = require('dgram');
const server = dgram.createSocket('udp4');
server.on('error', (err) => {
console.log(`Server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
try {
// Try to parse as JSON
const jsonData = JSON.parse(msg.toString());
console.log(`Received JSON from ${rinfo.address}:${rinfo.port}`, jsonData);
// Process CVEDIA-RT analytics data
if (jsonData.events) {
jsonData.events.forEach(event => {
console.log(`Event: ${event.type}, Confidence: ${event.confidence}`);
});
}
} catch (err) {
// Handle as plain text
console.log(`Received text from ${rinfo.address}:${rinfo.port}: ${msg}`);
}
});
server.on('listening', () => {
const address = server.address();
console.log(`UDP Server listening on ${address.address}:${address.port}`);
});
server.bind(5000);
See Also¶
- Output Plugins Overview
- Output Plugin - Main output coordination system
- MQTT Plugin - MQTT messaging protocol
- WriteData Plugin - File-based output
- GStreamerWriter Plugin - Video streaming
- HLS Plugin - HTTP Live Streaming
- Network Configuration Guide - CVEDIA-RT networking
- Plugin Development Guide - Creating custom plugins