Skip to content

REST Plugin

Description

The REST plugin provides UDP-based data transmission capabilities through the Output plugin system. Despite its name suggesting HTTP REST functionality, this plugin specifically handles UDP protocol communication for real-time data streaming. It registers URI handlers for udp:// and udpimage:// schemes, enabling efficient network transmission of analytics data and images via UDP sockets.

Key Features

  • UDP Data Transmission: High-performance UDP socket communication
  • Image Streaming: Specialized UDP image transmission (udpimage://)
  • JSON Data Export: Structured data transmission via UDP (udp://)
  • Output Handler Integration: Registered with Output plugin system
  • Configurable Endpoints: Flexible IP address and port configuration
  • Real-time Streaming: Low-latency UDP transmission for live data
  • Binary and Text Support: Handles both structured data and binary images
  • Network Protocol Abstraction: Simplified UDP communication through URI scheme

When to Use

  • Real-time data streaming to UDP-enabled applications
  • Low-latency network transmission of analytics results
  • Integration with UDP-based monitoring systems
  • Streaming structured data (JSON) over UDP
  • Transmitting images or video frames via UDP
  • Network communication with custom UDP servers
  • High-frequency data export where TCP overhead is undesirable
  • Integration with legacy systems expecting UDP data

Requirements

Software Dependencies

  • Network connectivity to target UDP endpoints
  • UDP socket support (standard on all platforms)
  • Optional: Image encoding libraries (for udpimage://)

Network Requirements

  • UDP network connectivity to destination hosts
  • Configurable UDP ports (default: 1234)
  • Network bandwidth appropriate for data volume
  • Firewall rules allowing UDP outbound connections
  • Target systems capable of receiving UDP data

Configuration

The REST plugin is configured through the Output plugin system using udp:// and udpimage:// URI schemes.

Basic UDP Data Configuration

{
  "output": {
    "handlers": {
      "udp-data": {
        "uri": "udp://192.168.1.100:5000",
        "sink": "events",
        "enabled": true,
        "config": {
          "ip": "192.168.1.100",
          "port": "5000"
        }
      }
    }
  }
}

UDP Image Transmission Configuration

{
  "output": {
    "handlers": {
      "udp-images": {
        "uri": "udpimage://192.168.1.200:6000",
        "sink": "crops",
        "enabled": true,
        "config": {
          "ip": "192.168.1.200",
          "port": "6000"
        }
      }
    }
  }
}

Multiple UDP Endpoints

{
  "output": {
    "handlers": {
      "primary-udp": {
        "uri": "udp://primary-server.example.com:8080",
        "sink": "events",
        "enabled": true,
        "config": {
          "ip": "primary-server.example.com",
          "port": "8080"
        }
      },
      "backup-udp": {
        "uri": "udp://backup-server.example.com:8080",
        "sink": "events", 
        "enabled": true,
        "config": {
          "ip": "backup-server.example.com",
          "port": "8080"
        }
      },
      "image-stream": {
        "uri": "udpimage://monitoring.example.com:9000",
        "sink": "output",
        "enabled": true,
        "config": {
          "ip": "monitoring.example.com",
          "port": "9000"
        }
      }
    }
  }
}

Configuration Schema

Parameter Type Default Description
uri string required UDP URI (udp:// or udpimage://)
sink string "output" Data sink to connect to
enabled boolean true Enable/disable UDP output
config.ip string "127.0.0.1" Target IP address for UDP transmission
config.port string "1234" Target port for UDP transmission

URI Schemes

The REST plugin registers the following URI schemes with the Output plugin:

UDP Data Scheme (udp://)

Format: udp://host:port

  • Purpose: Transmit structured data (JSON) via UDP
  • Data Format: String representation of sink data
  • Use Case: Real-time analytics data streaming

Examples:

udp://localhost:1234              # Local UDP server  
udp://192.168.1.100:5000          # Remote UDP endpoint
udp://analytics-server.com:8080   # Named host with custom port

UDP Image Scheme (udpimage://)

Format: udpimage://host:port

  • Purpose: Transmit image data via UDP
  • Data Format: Binary image data from sink
  • Use Case: Real-time image/video frame streaming
  • Requirements: Sink data must contain "image" field with cbuffer

Examples:

udpimage://192.168.1.200:6000           # Image streaming endpoint
udpimage://video-monitor.local:7000     # Local video monitoring
udpimage://stream.example.com:9999      # Remote image streaming

API Reference

C++ API

The REST plugin implements the iface::OutputHandler interface for UDP communication:

class RESTOutputHandler : public iface::OutputHandler {
public:
    // Constructor
    RESTOutputHandler(const std::string& moduleName,
                     const std::string& schema, 
                     const std::string& sink,
                     const std::string& path, 
                     const pCValue& config);

    // Factory method (registered with Output plugin)
    static std::shared_ptr<iface::OutputHandler> create(
        const std::string& moduleName,
        const std::string& schema,
        const std::string& sink, 
        const std::string& path,
        pCValue config);

    // UDP transmission
    expected<bool> write(pCValue sinkData = VAL(), 
                        std::string dynamicPath = "") override;
    void stop() override;
    void close() override;

    // Handler information
    std::string getSink() override;
};

UDP Socket Implementation

// Internal UDP socket handling
class UDPsocket {
public:
    bool open();
    void close();

    // Data transmission methods
    int send(const std::string& data, const IPv4& address);
    int send(const std::string& ip, const IPv4& address);

    // Address handling
    static IPv4 IPv4(const std::string& ip, uint16_t port);
};

Core REST Classes

class REST {
public:
    struct config {
        std::string ip = "127.0.0.1";
        std::string port = "1234";
        // Additional configuration parameters
    };

    // Configuration management
    bool setConfig(pCValue config);
    pCValue getConfig() const;
};

Lua API

The REST plugin is accessed through the Output plugin system:

-- UDP outputs are created through the Output plugin
local output = api.factory.output.create(instance, "UDPStreamer")

-- Add UDP data handler
local udpHandler = output:addHandler(
    "udp-stream",
    "udp://192.168.1.100:5000",
    "events",
    {
        ip = "192.168.1.100",
        port = "5000"
    }
)

-- Add UDP image handler
local udpImageHandler = output:addHandler(
    "udp-images",
    "udpimage://192.168.1.200:6000",
    "crops",
    {
        ip = "192.168.1.200", 
        port = "6000"
    }
)

Note: The REST plugin operates as output handlers registered with UDP URI schemes through the Output plugin delegation system.

Examples

Basic UDP Data Streaming

-- Create output instance for UDP data streaming
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "UDPDataStreamer")

-- Configure UDP data transmission
local udpConfig = {
    ip = "192.168.1.100",
    port = "5000"
}

local udpHandler = output:addHandler(
    "analytics-udp",
    "udp://192.168.1.100:5000",
    "events",
    udpConfig
)

if udpHandler then
    api.logging.LogInfo("UDP data streaming configured")
    api.logging.LogInfo("Target: 192.168.1.100:5000")
    api.logging.LogInfo("Data: JSON analytics events")

    -- Events from the "events" sink will be automatically transmitted as JSON strings via UDP
else
    api.logging.LogError("Failed to create UDP data handler")
end

UDP Image Streaming

local output = api.factory.output.create(instance, "UDPImageStreamer")

-- Configure UDP image transmission
local imageConfig = {
    ip = "192.168.1.200",
    port = "6000"
}

local imageHandler = output:addHandler(
    "image-udp",
    "udpimage://192.168.1.200:6000",
    "output",  -- Using "output" sink which contains image data
    imageConfig
)

if imageHandler then
    api.logging.LogInfo("UDP image streaming configured")
    api.logging.LogInfo("Target: 192.168.1.200:6000")
    api.logging.LogInfo("Data: Binary image frames")

    -- Image data from the "output" sink will be transmitted via UDP
    -- Requires sink data to contain "image" field with cbuffer
else
    api.logging.LogError("Failed to create UDP image handler")
end

Multi-Destination UDP Streaming

local output = api.factory.output.create(instance, "MultiUDPStreamer")

-- Configure multiple UDP destinations
local destinations = {
    {
        name = "primary-analytics",
        uri = "udp://analytics1.example.com:8080",
        sink = "events",
        config = {ip = "analytics1.example.com", port = "8080"}
    },
    {
        name = "backup-analytics",
        uri = "udp://analytics2.example.com:8080", 
        sink = "events",
        config = {ip = "analytics2.example.com", port = "8080"}
    },
    {
        name = "monitoring-images",
        uri = "udpimage://monitor.example.com:9000",
        sink = "crops",
        config = {ip = "monitor.example.com", port = "9000"}
    }
}

-- Create all UDP handlers
local activeHandlers = {}
for _, destination in ipairs(destinations) do
    local handler = output:addHandler(
        destination.name,
        destination.uri, 
        destination.sink,
        destination.config
    )

    if handler then
        table.insert(activeHandlers, destination.name)
        api.logging.LogInfo("Created UDP handler: " .. destination.name)
        api.logging.LogInfo("  Target: " .. destination.config.ip .. ":" .. destination.config.port)
        api.logging.LogInfo("  Type: " .. (destination.uri:match("udpimage://") and "Image" or "Data"))
    else
        api.logging.LogError("Failed to create handler: " .. destination.name)
    end
end

api.logging.LogInfo("Active UDP handlers: " .. #activeHandlers)

Local Network Broadcasting

local output = api.factory.output.create(instance, "LocalUDPBroadcaster")

-- Configure local network UDP broadcasting
local localConfigs = {
    {
        name = "workstation-1",
        ip = "192.168.1.101",
        port = "5000",
        sink = "events"
    },
    {
        name = "workstation-2", 
        ip = "192.168.1.102",
        port = "5000",
        sink = "events"
    },
    {
        name = "server-monitor",
        ip = "192.168.1.10",
        port = "8080",
        sink = "diagnostics"
    }
}

-- Create handlers for local network distribution
for _, config in ipairs(localConfigs) do
    local handler = output:addHandler(
        config.name,
        "udp://" .. config.ip .. ":" .. config.port,
        config.sink,
        {
            ip = config.ip,
            port = config.port
        }
    )

    if handler then
        api.logging.LogInfo("Local UDP configured: " .. config.name)
        api.logging.LogInfo("  Address: " .. config.ip .. ":" .. config.port)
        api.logging.LogInfo("  Sink: " .. config.sink)
    end
end

High-Frequency Data Streaming

local output = api.factory.output.create(instance, "HighFrequencyUDP")

-- Configure for high-frequency real-time data
local realtimeConfig = {
    ip = "127.0.0.1",  -- Local processing for minimal latency
    port = "9999"
}

local realtimeHandler = output:addHandler(
    "realtime-udp",
    "udp://127.0.0.1:9999",
    "raw-detections",  -- High-frequency detection data
    realtimeConfig
)

if realtimeHandler then
    api.logging.LogInfo("High-frequency UDP streaming configured")
    api.logging.LogInfo("Target: localhost:9999")
    api.logging.LogInfo("Data: Raw detection events")
    api.logging.LogInfo("Protocol: UDP (low latency)")

    -- This configuration is optimal for:
    -- - Real-time applications requiring minimal latency
    -- - High-frequency data streams
    -- - Local processing and analysis
    -- - Applications where occasional packet loss is acceptable
end

Custom UDP Server Integration

local output = api.factory.output.create(instance, "CustomUDPIntegration")

-- Configure for integration with custom UDP server application
local customConfig = {
    ip = "custom-server.internal.com",
    port = "12345"
}

local customHandler = output:addHandler(  
    "custom-integration",
    "udp://custom-server.internal.com:12345",
    "events",
    customConfig
)

if customHandler then
    api.logging.LogInfo("Custom UDP server integration configured")
    api.logging.LogInfo("Server: custom-server.internal.com:12345")

    -- The UDP handler will send JSON-formatted event data
    -- Example data format sent via UDP:
    -- {
    --   "timestamp": 1609459200,
    --   "camera_id": "camera-001",
    --   "events": [
    --     {
    --       "type": "intrusion",
    --       "confidence": 0.95,
    --       "bbox": {"x": 100, "y": 150, "width": 50, "height": 75}
    --     }
    --   ]
    -- }
end

Error Handling and Monitoring

local output = api.factory.output.create(instance, "MonitoredUDPStreamer")

-- Configure UDP with monitoring
local monitoredConfigs = {
    {
        name = "primary-udp",
        uri = "udp://primary.example.com:8080",
        sink = "events"
    },
    {
        name = "secondary-udp",
        uri = "udp://secondary.example.com:8080", 
        sink = "events"
    }
}

local successfulHandlers = {}
local failedHandlers = {}

-- Create handlers with error tracking
for _, config in ipairs(monitoredConfigs) do
    local handler = output:addHandler(
        config.name,
        config.uri,
        config.sink,
        {
            ip = config.uri:match("://([^:]+)"),
            port = config.uri:match(":(%d+)$")
        }
    )

    if handler then
        table.insert(successfulHandlers, config.name)
        api.logging.LogInfo("✓ UDP handler created: " .. config.name)
    else
        table.insert(failedHandlers, config.name)
        api.logging.LogError("✗ UDP handler failed: " .. config.name)
    end
end

-- Report status
api.logging.LogInfo("UDP Streaming Status:")
api.logging.LogInfo("  Successful: " .. #successfulHandlers .. "/" .. #monitoredConfigs)
api.logging.LogInfo("  Failed: " .. #failedHandlers .. "/" .. #monitoredConfigs)

if #failedHandlers > 0 then
    api.logging.LogError("  Failed handlers: " .. table.concat(failedHandlers, ", "))
end

-- Function to check handler status (if available)
function checkUDPStatus()
    for _, name in ipairs(successfulHandlers) do
        local hasHandler = output:hasHandler(name)
        if hasHandler then
            api.logging.LogInfo("Handler " .. name .. ": Active")
        else
            api.logging.LogInfo("Handler " .. name .. ": Inactive")
        end
    end
end

Best Practices

Network Configuration

  1. Target Selection:

    • Use local IP addresses (127.0.0.1) for same-machine communication
    • Use specific IP addresses for targeted transmission
    • Avoid broadcast addresses unless specifically needed
    • Consider network topology and routing
  2. Port Management:

    • Use ports above 1024 for non-privileged operation
    • Avoid well-known service ports (80, 443, 22, etc.)
    • Coordinate port usage across applications
    • Document port assignments for maintenance
  3. Data Volume Considerations:

    • Monitor network bandwidth usage
    • Consider UDP packet size limits (typically 64KB max)
    • Implement data compression for large payloads
    • Use appropriate sink selection for data volume

Performance Optimization

  1. UDP Characteristics:

    • UDP is connectionless and does not guarantee delivery
    • No built-in error correction or retransmission
    • Lower latency than TCP but potential for packet loss
    • Best for real-time applications tolerating data loss
  2. Data Format:

    • JSON strings for udp:// scheme (human-readable)
    • Binary image data for udpimage:// scheme (efficient)
    • Keep data payloads within reasonable UDP limits
    • Consider data serialization efficiency
  3. Resource Management:

    • UDP sockets are created and closed for each transmission
    • No persistent connections maintained
    • Minimal server-side resource usage
    • Suitable for high-frequency, short-duration transmissions

Error Handling

  1. Network Failures:

    • UDP transmission failures are reported but not retried
    • No automatic failover or retry mechanisms
    • Implement application-level monitoring if needed
    • Consider multiple destination handlers for redundancy
  2. Data Validation:

    • Receiving applications should validate UDP data
    • No built-in data integrity checks
    • Consider adding checksums or sequence numbers
    • Implement application-level error detection
  3. Troubleshooting:

    • Use network monitoring tools (tcpdump, Wireshark)
    • Check firewall rules for UDP traffic
    • Verify target applications are listening on specified ports
    • Monitor system logs for transmission errors

Troubleshooting

Connection Issues

  1. "send(): failed (REQ)" error

    • Cause: Network connectivity issues or target unreachable
    • Solution:
      # Test UDP connectivity
      nc -u target-host 5000
      echo "test message" | nc -u target-host 5000
      
      # Check routing
      traceroute target-host
      ping target-host
      
    • Check: Firewall rules, network configuration, target availability
  2. No data received at target

    • Cause: Incorrect IP/port configuration or target not listening
    • Solution:
      # Test UDP port listening
      nc -l -u 5000  # Listen on UDP port 5000
      
      # Check if port is in use
      netstat -un | grep :5000
      ss -un | grep :5000
      
    • Check: Target application configuration, port conflicts
  3. Intermittent transmission failures

    • Cause: Network congestion, packet loss, or target overload
    • Solution: Monitor network utilization, implement rate limiting
    • Check: Network performance, target processing capacity

Configuration Issues

  1. Handler creation fails

    • Cause: Invalid configuration parameters or network setup issues
    • Solution: Validate IP addresses and port numbers
    • Check:
      -- Validate IP address format
      local ip = "192.168.1.100"
      if not ip:match("^%d+%.%d+%.%d+%.%d+$") then
          api.logging.LogError("Invalid IP address format")
      end
      
      -- Validate port range
      local port = tonumber("5000")
      if not port or port < 1 or port > 65535 then
          api.logging.LogError("Invalid port number")
      end
      
  2. Incorrect data format

    • Cause: Sink data not in expected format for UDP transmission
    • Solution: Verify sink data structure and content
    • Check: For udpimage://, ensure sink data contains "image" field
  3. Performance degradation

    • Cause: High-frequency transmissions overwhelming network or target
    • Solution: Implement data throttling or selective transmission
    • Check: Monitor CPU usage, network utilization, target processing

Network Troubleshooting

  1. Packet Loss

    • Detection:
      # Monitor UDP statistics
      netstat -su | grep -i udp
      cat /proc/net/snmp | grep Udp
      
    • Mitigation: Reduce transmission frequency, increase buffer sizes
    • Monitoring: Implement application-level sequence numbers
  2. Firewall Issues

    • Windows:
      # Check Windows firewall for UDP rules
      netsh advfirewall firewall show rule dir=out protocol=udp
      
      # Add UDP rule if needed
      netsh advfirewall firewall add rule name="CVEDIA UDP Out" dir=out action=allow protocol=udp localport=5000
      
    • Linux:
      # Check iptables for UDP rules
      iptables -L OUTPUT -v -n | grep udp
      
      # Add UDP rule if needed
      iptables -A OUTPUT -p udp --dport 5000 -j ACCEPT
      
  3. Network Monitoring

    • Real-time monitoring:
      # Monitor UDP traffic with tcpdump
      tcpdump -i eth0 udp port 5000
      
      # Monitor UDP packets with specific host
      tcpdump -i any udp and host 192.168.1.100 and port 5000
      
    • Analysis: Use Wireshark for detailed packet analysis

Debugging Tools

  1. Network Testing:

    # Send test UDP packet
    echo "test" | nc -u 192.168.1.100 5000
    
    # Listen for UDP packets
    nc -l -u 5000
    
    # Test with timeout
    timeout 10 nc -l -u 5000
    

  2. Port Scanning:

    # Check if UDP port is open
    nmap -sU -p 5000 192.168.1.100
    
    # Scan range of UDP ports
    nmap -sU -p 5000-5010 192.168.1.100
    

  3. System Monitoring:

    # Monitor system UDP statistics
    watch -n 1 'cat /proc/net/snmp | grep Udp'
    
    # Monitor network interface statistics
    watch -n 1 'cat /proc/net/dev'
    

Integration Examples

Simple UDP Server (Python)

#!/usr/bin/env python3
import socket
import json
import threading

class UDPReceiver:
    def __init__(self, host='0.0.0.0', port=5000):
        self.host = host
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind((self.host, self.port))
        self.running = False

    def start(self):
        self.running = True
        print(f"UDP Server listening on {self.host}:{self.port}")

        while self.running:
            try:
                data, addr = self.socket.recvfrom(65536)
                self.process_data(data.decode('utf-8'), addr)
            except socket.timeout:
                continue
            except Exception as e:
                print(f"Error receiving data: {e}")

    def process_data(self, data, address):
        try:
            # Try to parse as JSON
            json_data = json.loads(data)
            print(f"Received JSON from {address}: {json_data}")
            # Process analytics data here
        except json.JSONDecodeError:
            # Handle as plain text
            print(f"Received text from {address}: {data}")

    def stop(self):
        self.running = False
        self.socket.close()

if __name__ == "__main__":
    server = UDPReceiver('0.0.0.0', 5000)
    try:
        server.start()
    except KeyboardInterrupt:
        server.stop()

UDP Image Receiver (Python)

#!/usr/bin/env python3
import socket
import numpy as np
import cv2

class UDPImageReceiver:
    def __init__(self, host='0.0.0.0', port=6000):
        self.host = host
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind((self.host, self.port))

    def start(self):
        print(f"UDP Image Server listening on {self.host}:{self.port}")

        while True:
            try:
                data, addr = self.socket.recvfrom(65536)
                self.process_image(data, addr)
            except Exception as e:
                print(f"Error receiving image: {e}")

    def process_image(self, data, address):
        try:
            # Convert bytes to numpy array
            nparr = np.frombuffer(data, np.uint8)
            # Decode image
            img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

            if img is not None:
                print(f"Received image from {address}: {img.shape}")
                # Process image here
                cv2.imshow('UDP Image', img)
                cv2.waitKey(1)
            else:
                print(f"Failed to decode image from {address}")
        except Exception as e:
            print(f"Error processing image: {e}")

if __name__ == "__main__":
    receiver = UDPImageReceiver('0.0.0.0', 6000)
    receiver.start()

Node.js UDP Server

const dgram = require('dgram');
const server = dgram.createSocket('udp4');

server.on('error', (err) => {
  console.log(`Server error:\n${err.stack}`);
  server.close();
});

server.on('message', (msg, rinfo) => {
  try {
    // Try to parse as JSON
    const jsonData = JSON.parse(msg.toString());
    console.log(`Received JSON from ${rinfo.address}:${rinfo.port}`, jsonData);

    // Process CVEDIA-RT analytics data
    if (jsonData.events) {
      jsonData.events.forEach(event => {
        console.log(`Event: ${event.type}, Confidence: ${event.confidence}`);
      });
    }
  } catch (err) {
    // Handle as plain text
    console.log(`Received text from ${rinfo.address}:${rinfo.port}: ${msg}`);
  }
});

server.on('listening', () => {
  const address = server.address();
  console.log(`UDP Server listening on ${address.address}:${address.port}`);
});

server.bind(5000);

See Also