Skip to content

TopBus Plugin

Description

The TopBus plugin provides high-performance message bus communication capabilities through the Output plugin system. It registers the topbus:// URI scheme and serves as an advanced output handler for inter-process communication, event distribution, and data exchange within CVEDIA-RT components. TopBus offers enhanced performance features including shared memory communication and specialized VANA (Video Analytics) integration with additional PTZ (Pan-Tilt-Zoom) camera control capabilities.

Key Features

  • High-Performance Message Bus: Advanced messaging system with shared memory support
  • Output Handler Integration: Registered topbus:// URI scheme with Output plugin
  • Multi-Sink Support: Handles various data types (source_info, eventsExport, locked_tracks, PTZ control)
  • VANA Data Structure Integration: Enhanced integration with Video Analytics data formats
  • PTZ Camera Control: Specialized handling of PTZ camera control and idle detection
  • Shared Memory Communication: High-throughput data exchange using shared memory
  • Event Distribution: Efficient event routing between system components
  • Track Management: Advanced tracking data management and movement events
  • Frame Processing Coordination: Synchronized frame processing workflows
  • Singleton Architecture: Centralized TopBusCore instance for system-wide coordination

When to Use

  • High-performance inter-process communication within CVEDIA-RT
  • Advanced event distribution with shared memory optimization
  • Integration with VANA (Video Analytics) systems requiring high throughput
  • PTZ camera control and coordination workflows
  • Performance-critical messaging for real-time applications
  • Large-scale data exchange between distributed components
  • Specialized messaging for enterprise CVEDIA-RT deployments

Requirements

Software Dependencies

  • CVEDIA-RT core system
  • TopBus core libraries and messaging infrastructure
  • Shared memory support (OS-level)
  • VANA data structure support (if using analytics features)
  • PTZ camera control libraries (if using PTZ features)

System Requirements

  • Sufficient shared memory for high-throughput messaging
  • CPU resources for message routing and processing
  • Memory for message buffering and shared memory segments
  • Inter-process communication capabilities
  • PTZ camera hardware support (if using PTZ features)

Configuration

The TopBus plugin is configured through the Output plugin system using the topbus:// URI scheme.

Basic Configuration

{
  "output": {
    "handlers": {
      "topbus-events": {
        "uri": "topbus://analytics-hub",
        "sink": "eventsExport",
        "enabled": true,
        "config": {
          "messageType": "analytics",
          "useSharedMemory": true,
          "bufferSize": 2000
        }
      }
    }
  }
}

Advanced Multi-Sink Configuration

{
  "output": {
    "handlers": {
      "topbus-source-info": {
        "uri": "topbus://source-coordinator",
        "sink": "source_info",
        "enabled": true,
        "config": {
          "messageType": "source_management",
          "useSharedMemory": true
        }
      },
      "topbus-events": {
        "uri": "topbus://events-hub",
        "sink": "eventsExport",
        "enabled": true,
        "config": {
          "messageType": "analytics",
          "useSharedMemory": true,
          "enableVanaFormat": true,
          "bufferSize": 3000
        }
      },
      "topbus-tracking": {
        "uri": "topbus://tracking-coordinator",
        "sink": "locked_tracks",
        "enabled": true,
        "config": {
          "messageType": "tracking",
          "useSharedMemory": true,
          "trackingOptimized": true
        }
      },
      "topbus-ptz": {
        "uri": "topbus://ptz-controller",
        "sink": "ptz_idle_request",
        "enabled": true,
        "config": {
          "messageType": "ptz_control",
          "ptzEnabled": true,
          "idleDetection": true
        }
      }
    }
  }
}

High-Performance Configuration

{
  "output": {
    "handlers": {
      "topbus-high-perf": {
        "uri": "topbus://high-performance-hub",
        "sink": "eventsExport",
        "enabled": true,
        "config": {
          "messageType": "high_performance",
          "useSharedMemory": true,
          "bufferSize": 5000,
          "sharedMemorySize": "64MB",
          "enableVanaFormat": true,
          "trackingEnabled": true,
          "ptzEnabled": true,
          "performanceMode": "maximum",
          "messageCompression": true,
          "priorityQueue": true
        }
      }
    }
  }
}

Configuration Schema

Parameter Type Default Description
uri string required TopBus URI (topbus://path)
sink string required Specific sink type for TopBus processing
enabled boolean true Enable/disable TopBus output
config.messageType string "generic" Type of messages to handle
config.useSharedMemory boolean true Enable shared memory communication
config.bufferSize integer 2000 Message buffer size
config.sharedMemorySize string "32MB" Shared memory segment size
config.enableVanaFormat boolean false Enable VANA data structure format
config.trackingEnabled boolean true Enable tracking data processing
config.ptzEnabled boolean false Enable PTZ camera control features
config.idleDetection boolean false Enable PTZ idle detection
config.performanceMode string "balanced" Performance mode (balanced, maximum, efficient)
config.messageCompression boolean false Enable message compression
config.priorityQueue boolean false Enable priority-based message queuing

URI Scheme

The TopBus plugin registers the topbus:// URI scheme with the Output plugin system.

URI Format

topbus://[path][?options]

Components: - path: Optional path identifier for message routing and hub identification - options: URL-encoded query parameters for configuration

URI Examples

topbus://analytics-hub              # Analytics data hub
topbus://high-performance-comm      # High-performance communication
topbus://tracking-coordinator       # Tracking coordination hub
topbus://vana-integration           # VANA system integration
topbus://ptz-controller             # PTZ camera control hub

Supported Sink Types

The TopBus plugin handles specific sink types with specialized high-performance processing:

Source Information (source_info)

  • Purpose: Initialize or update triggers based on source data
  • Processing: High-performance source configuration and trigger setup
  • Data Format: Source metadata and configuration information
  • Optimization: Shared memory for rapid configuration updates

Events Export (eventsExport)

  • Purpose: Process analytics events for high-throughput distribution
  • Processing: Prepares VANA data structure with shared memory optimization
  • Data Format: Structured analytics events and detections
  • Optimization: Bulk processing and shared memory transfers

Locked Tracks (locked_tracks)

  • Purpose: Manage persistent tracking data with high performance
  • Processing: Adds locked tracks to VANA data structure using shared memory
  • Data Format: Track objects with locked/confirmed status
  • Optimization: Efficient track state management

Movement Tracks (object_left_removed_motion_tracks)

  • Purpose: Handle object movement and state changes with low latency
  • Processing: Tracks from object left/removed triggers with shared memory
  • Data Format: Movement tracking data and object state changes
  • Optimization: Real-time movement tracking updates

Frame Processed (frame_processed)

  • Purpose: Coordinate frame processing completion with synchronization
  • Processing: Processes VANA data and clears temporary data efficiently
  • Data Format: Frame processing status and cleanup signals
  • Optimization: Synchronized frame processing coordination

PTZ Idle Request (ptz_idle_request)

  • Purpose: PTZ camera control and idle detection
  • Processing: Checks PTZ movement status and sets idle flags
  • Data Format: PTZ status, movement detection, idle state
  • Optimization: Real-time PTZ control coordination

API Reference

C++ API

The TopBus plugin implements the iface::OutputHandler interface with high-performance messaging capabilities:

class TopBusOutputHandler : public iface::OutputHandler {
public:
    // Constructor
    TopBusOutputHandler(const std::string& moduleName,
                       const std::string& schema, 
                       const std::string& sink,
                       const std::string& path, 
                       const pCValue& config);

    // Factory method (registered with Output plugin)
    static std::shared_ptr<iface::OutputHandler> create(
        const std::string& moduleName,
        const std::string& schema,
        const std::string& sink, 
        const std::string& path,
        pCValue config);

    // High-performance message operations
    expected<bool> write(pCValue sinkData = VAL(), 
                        std::string dynamicPath = "") override;
    void stop() override;
    void close() override;

    // Handler information
    std::string getSink() override;
};

TopBus Core Classes

// Singleton high-performance message bus core
class TopBusCore {
public:
    // Singleton access
    static TopBusCore* getInstance();

    // High-performance sink handlers
    expected<bool> handleSourceSink(pCValue sinkData);
    expected<bool> handleEventsExportSink(pCValue sinkData);
    expected<bool> handleLockedTracksSink(pCValue sinkData);
    expected<bool> handleMovementTracksSink(pCValue sinkData);
    expected<bool> handleFrameProcessedSink(pCValue sinkData);
    expected<bool> handlePTZIdleSink(pCValue sinkData);

    // Shared memory operations
    bool initializeSharedMemory(size_t size);
    bool writeToSharedMemory(const std::string& topic, pCValue data);
    pCValue readFromSharedMemory(const std::string& topic);
    void cleanupSharedMemory();

    // High-performance messaging
    bool publishMessage(const std::string& topic, pCValue data, bool useSharedMemory = true);
    bool subscribeToTopic(const std::string& topic, MessageHandler handler);
    void unsubscribeFromTopic(const std::string& topic);

    // VANA integration with performance optimization
    bool prepareVanaData(pCValue eventData);
    void clearTemporaryData();
    bool optimizeVanaProcessing();

    // PTZ control integration
    bool updatePTZStatus(pCValue ptzData);
    bool checkPTZIdle();
    void setPTZIdleFlag(bool idle);
};

// Managed TopBus interface with high-performance features
class TopBusManaged : public iface::TopBus {
public:
    // Factory method
    static std::unique_ptr<iface::TopBus> create(const std::string& moduleName);

    // High-performance message operations
    bool sendMessage(const std::string& destination, pCValue message, bool useSharedMemory = true);
    pCValue receiveMessage(const std::string& source);
    bool broadcast(pCValue message, bool useSharedMemory = true);

    // Subscription management
    bool subscribe(const std::string& topic);
    bool unsubscribe(const std::string& topic);

    // Performance monitoring
    size_t getMessageQueueSize();
    double getMessageThroughput();
    size_t getSharedMemoryUsage();
};

// Shared memory utilities
class TV_SHM {
public:
    // Shared memory segment management
    bool createSegment(const std::string& name, size_t size);
    bool attachSegment(const std::string& name);
    void* getSegmentPointer();
    bool detachSegment();
    void destroySegment();

    // Data operations
    bool writeData(const void* data, size_t size, size_t offset = 0);
    bool readData(void* buffer, size_t size, size_t offset = 0);
    size_t getSegmentSize();
};

Lua API

The TopBus plugin is accessed through the Output plugin system and standalone factory:

-- Via Output plugin system (recommended for high-performance data distribution)
local output = api.factory.output.create(instance, "TopBusCoordinator")

-- Add TopBus output handler for high-performance analytics
local topbusHandler = output:addHandler(
    "topbus-analytics",
    "topbus://analytics-hub",
    "eventsExport",
    {
        messageType = "analytics",
        useSharedMemory = true,
        bufferSize = 3000,
        enableVanaFormat = true
    }
)

-- Add TopBus handler for PTZ control
local ptzHandler = output:addHandler(
    "topbus-ptz",
    "topbus://ptz-controller",
    "ptz_idle_request",
    {
        messageType = "ptz_control",
        ptzEnabled = true,
        idleDetection = true
    }
)

-- Standalone TopBus client (for direct high-performance messaging)
local topbus = api.factory.topbus.create("TopBusClient")

-- High-performance message operations
topbus:sendMessage("destination", messageData, true)  -- true = use shared memory
local receivedMessage = topbus:receiveMessage("source")
topbus:broadcast(broadcastData, true)  -- true = use shared memory

-- Subscription management
topbus:subscribe("analytics/events")
topbus:unsubscribe("analytics/events")

-- Performance monitoring
local queueSize = topbus:getMessageQueueSize()
local throughput = topbus:getMessageThroughput()
local memoryUsage = topbus:getSharedMemoryUsage()

Note: TopBus operates as both a high-performance output handler integrated with the Output plugin system and a standalone messaging interface for direct inter-process communication with shared memory optimization.

Examples

High-Performance Analytics Distribution

-- Create output instance for TopBus high-performance event distribution
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "TopBusHighPerfDistributor")

-- Configure TopBus for high-throughput analytics
local highPerfConfig = {
    messageType = "high_performance_analytics",
    useSharedMemory = true,
    bufferSize = 5000,
    sharedMemorySize = "64MB",
    enableVanaFormat = true,
    performanceMode = "maximum",
    messageCompression = true
}

local highPerfHandler = output:addHandler(
    "topbus-high-perf",
    "topbus://high-performance-analytics",
    "eventsExport",
    highPerfConfig
)

if highPerfHandler then
    api.logging.LogInfo("TopBus high-performance analytics configured")
    api.logging.LogInfo("Hub: high-performance-analytics")
    api.logging.LogInfo("Shared Memory: 64MB")
    api.logging.LogInfo("Performance Mode: Maximum")
    api.logging.LogInfo("Message Compression: Enabled")

    -- Analytics events will be distributed through shared memory
    -- for maximum throughput and minimal latency
else
    api.logging.LogError("Failed to create TopBus high-performance handler")
end

Multi-Component High-Performance Coordination

local output = api.factory.output.create(instance, "TopBusAdvancedCoordinator")

-- Configure multiple TopBus handlers for different high-performance workflows
local topbusHandlers = {
    {
        name = "source-coordinator",
        uri = "topbus://source-management",
        sink = "source_info",
        config = {
            messageType = "source_coordination",
            useSharedMemory = true,
            bufferSize = 1000
        }
    },
    {
        name = "analytics-distributor",
        uri = "topbus://analytics-hub", 
        sink = "eventsExport",
        config = {
            messageType = "analytics",
            useSharedMemory = true,
            bufferSize = 4000,
            enableVanaFormat = true,
            performanceMode = "maximum"
        }
    },
    {
        name = "tracking-manager",
        uri = "topbus://tracking-coordinator",
        sink = "locked_tracks",
        config = {
            messageType = "tracking",
            useSharedMemory = true,
            trackingOptimized = true,
            bufferSize = 2000
        }
    },
    {
        name = "movement-tracker",
        uri = "topbus://movement-coordinator",
        sink = "object_left_removed_motion_tracks",
        config = {
            messageType = "movement_tracking",
            useSharedMemory = true,
            realTimeOptimized = true
        }
    },
    {
        name = "frame-processor",
        uri = "topbus://frame-sync",
        sink = "frame_processed",
        config = {
            messageType = "frame_coordination",
            useSharedMemory = true,
            syncOptimized = true
        }
    },
    {
        name = "ptz-controller",
        uri = "topbus://ptz-control",
        sink = "ptz_idle_request",
        config = {
            messageType = "ptz_control",
            ptzEnabled = true,
            idleDetection = true,
            realTimeControl = true
        }
    }
}

-- Create all TopBus high-performance coordination handlers
local activeHandlers = {}
for _, handler in ipairs(topbusHandlers) do
    local topbusHandler = output:addHandler(
        handler.name,
        handler.uri,
        handler.sink,
        handler.config
    )

    if topbusHandler then
        table.insert(activeHandlers, handler.name)
        api.logging.LogInfo("TopBus handler created: " .. handler.name)
        api.logging.LogInfo("  URI: " .. handler.uri)
        api.logging.LogInfo("  Sink: " .. handler.sink)
        api.logging.LogInfo("  Shared Memory: " .. (handler.config.useSharedMemory and "Enabled" or "Disabled"))
        api.logging.LogInfo("  Performance: " .. (handler.config.performanceMode or "Standard"))
    else
        api.logging.LogError("Failed to create TopBus handler: " .. handler.name)
    end
end

api.logging.LogInfo("Active TopBus high-performance handlers: " .. #activeHandlers)

PTZ Camera Integration

local output = api.factory.output.create(instance, "TopBusPTZIntegration")

-- Configure TopBus for PTZ camera control and coordination
local ptzConfig = {
    messageType = "ptz_coordination",
    ptzEnabled = true,
    idleDetection = true,
    realTimeControl = true,
    useSharedMemory = true,
    bufferSize = 500
}

local ptzHandler = output:addHandler(
    "ptz-coordinator",
    "topbus://ptz-control-hub",
    "ptz_idle_request",
    ptzConfig
)

-- Also configure analytics coordination with PTZ
local analyticsWithPTZHandler = output:addHandler(
    "analytics-ptz-coord",
    "topbus://analytics-ptz-coordination",
    "eventsExport",
    {
        messageType = "analytics_with_ptz",
        useSharedMemory = true,
        enableVanaFormat = true,
        ptzIntegration = true,
        bufferSize = 2000
    }
)

if ptzHandler and analyticsWithPTZHandler then
    api.logging.LogInfo("TopBus PTZ integration configured")
    api.logging.LogInfo("PTZ Control Hub: ptz-control-hub")
    api.logging.LogInfo("Analytics-PTZ Coordination: analytics-ptz-coordination")
    api.logging.LogInfo("Idle Detection: Enabled")
    api.logging.LogInfo("Real-time Control: Enabled")

    -- Function to monitor PTZ status
    function monitorPTZStatus()
        local topbus = api.factory.topbus.create("PTZMonitor")
        topbus:subscribe("ptz/status")

        local ptzStatus = topbus:receiveMessage("ptz/status")
        if ptzStatus then
            api.logging.LogInfo("PTZ Status Update:")
            api.logging.LogInfo("  Position: " .. (ptzStatus.position or "unknown"))
            api.logging.LogInfo("  Moving: " .. (ptzStatus.moving and "Yes" or "No"))
            api.logging.LogInfo("  Idle: " .. (ptzStatus.idle and "Yes" or "No"))
        end
    end
else
    api.logging.LogError("Failed to create TopBus PTZ integration")
end

VANA Integration with Shared Memory

local output = api.factory.output.create(instance, "TopBusVANAIntegration")

-- Configure TopBus for high-performance VANA system integration
local vanaConfig = {
    messageType = "vana_analytics",
    useSharedMemory = true,
    bufferSize = 8000,
    sharedMemorySize = "128MB",
    enableVanaFormat = true,
    trackingEnabled = true,
    performanceMode = "maximum",
    messageCompression = true,
    priorityQueue = true,
    eventFiltering = {
        enabled = true,
        allowedTypes = {"intrusion", "line_crossing", "object_detection", "abandoned_object", "crowd_detection"}
    }
}

local vanaHandler = output:addHandler(
    "vana-high-perf",
    "topbus://vana-high-performance",
    "eventsExport",
    vanaConfig
)

-- Configure additional VANA workflow handlers
local vanaWorkflowHandlers = {
    {
        name = "vana-tracking",
        uri = "topbus://vana-tracking-hub",
        sink = "locked_tracks",
        config = {
            messageType = "vana_tracking",
            useSharedMemory = true,
            enableVanaFormat = true,
            trackingOptimized = true
        }
    },
    {
        name = "vana-movement",
        uri = "topbus://vana-movement-hub",
        sink = "object_left_removed_motion_tracks",
        config = {
            messageType = "vana_movement",
            useSharedMemory = true,
            enableVanaFormat = true,
            realTimeOptimized = true
        }
    },
    {
        name = "vana-frame-sync",
        uri = "topbus://vana-frame-sync",
        sink = "frame_processed", 
        config = {
            messageType = "vana_frame_sync",
            useSharedMemory = true,
            enableVanaFormat = true,
            syncOptimized = true
        }
    }
}

-- Create VANA workflow handlers
local vanaHandlers = {}
for _, config in ipairs(vanaWorkflowHandlers) do
    local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
    if handler then
        table.insert(vanaHandlers, config.name)
        api.logging.LogInfo("VANA handler created: " .. config.name)
    end
end

if vanaHandler and #vanaHandlers == #vanaWorkflowHandlers then
    api.logging.LogInfo("Complete VANA high-performance workflow established")
    api.logging.LogInfo("Shared Memory: 128MB")
    api.logging.LogInfo("Performance Mode: Maximum")
    api.logging.LogInfo("Message Compression: Enabled")
    api.logging.LogInfo("Priority Queue: Enabled")
    api.logging.LogInfo("Event Filtering: Advanced")
end

Enterprise-Scale Deployment

local output = api.factory.output.create(instance, "TopBusEnterpriseDeployment")

-- Configure TopBus for enterprise-scale deployment
local enterpriseConfigs = {
    {
        name = "enterprise-analytics",
        uri = "topbus://enterprise-analytics-hub",
        sink = "eventsExport",
        config = {
            messageType = "enterprise_analytics",
            useSharedMemory = true,
            bufferSize = 10000,
            sharedMemorySize = "256MB",
            enableVanaFormat = true,
            performanceMode = "maximum",
            messageCompression = true,
            priorityQueue = true,
            enterpriseFeatures = true
        }
    },
    {
        name = "enterprise-tracking",
        uri = "topbus://enterprise-tracking-hub",
        sink = "locked_tracks",
        config = {
            messageType = "enterprise_tracking",
            useSharedMemory = true,
            bufferSize = 5000,
            trackingOptimized = true,
            enterpriseScale = true
        }
    },
    {
        name = "enterprise-ptz",
        uri = "topbus://enterprise-ptz-hub",
        sink = "ptz_idle_request", 
        config = {
            messageType = "enterprise_ptz",
            ptzEnabled = true,
            idleDetection = true,
            realTimeControl = true,
            enterpriseCoordination = true
        }
    }
}

-- Create enterprise-scale handlers
local enterpriseHandlers = {}
for _, config in ipairs(enterpriseConfigs) do
    local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
    if handler then
        table.insert(enterpriseHandlers, config.name)
        api.logging.LogInfo("Enterprise handler created: " .. config.name)
        api.logging.LogInfo("  Shared Memory: " .. (config.config.sharedMemorySize or "Standard"))
        api.logging.LogInfo("  Buffer Size: " .. config.config.bufferSize)
        api.logging.LogInfo("  Performance Mode: " .. (config.config.performanceMode or "Standard"))
    end
end

-- Monitor enterprise deployment performance
function monitorEnterprisePerformance()
    local topbus = api.factory.topbus.create("EnterpriseMonitor")

    api.logging.LogInfo("Enterprise TopBus Performance Status:")
    api.logging.LogInfo("  Active Handlers: " .. #enterpriseHandlers)
    api.logging.LogInfo("  Queue Size: " .. topbus:getMessageQueueSize())
    api.logging.LogInfo("  Throughput: " .. string.format("%.2f msg/sec", topbus:getMessageThroughput()))
    api.logging.LogInfo("  Shared Memory Usage: " .. string.format("%.2f MB", topbus:getSharedMemoryUsage() / (1024*1024)))
end

api.logging.LogInfo("Enterprise TopBus deployment configured: " .. #enterpriseHandlers .. " handlers")

Standalone High-Performance Messaging

-- For direct TopBus high-performance messaging (not through Output plugin)
local topbus = api.factory.topbus.create("DirectTopBusClient")

-- Subscribe to high-performance analytics events
topbus:subscribe("analytics/high-perf-events")
topbus:subscribe("tracking/real-time-updates")
topbus:subscribe("ptz/control-commands")
topbus:subscribe("system/performance-metrics")

-- High-performance message processing function
function processTopBusMessages()
    -- Check for high-performance analytics events
    local analyticsMessage = topbus:receiveMessage("analytics/high-perf-events")
    if analyticsMessage then
        api.logging.LogInfo("High-performance analytics event:")
        api.logging.LogInfo("  Type: " .. analyticsMessage.type)
        api.logging.LogInfo("  Confidence: " .. analyticsMessage.confidence)
        api.logging.LogInfo("  Processing Time: " .. (analyticsMessage.processing_time or "N/A") .. "ms")
    end

    -- Check for real-time tracking updates
    local trackingMessage = topbus:receiveMessage("tracking/real-time-updates")
    if trackingMessage then
        api.logging.LogInfo("Real-time tracking update:")
        api.logging.LogInfo("  Track ID: " .. trackingMessage.track_id)
        api.logging.LogInfo("  Status: " .. trackingMessage.status)
        api.logging.LogInfo("  Position: (" .. trackingMessage.x .. ", " .. trackingMessage.y .. ")")
    end

    -- Check for PTZ control commands
    local ptzMessage = topbus:receiveMessage("ptz/control-commands")
    if ptzMessage then
        api.logging.LogInfo("PTZ control command:")
        api.logging.LogInfo("  Command: " .. ptzMessage.command)
        api.logging.LogInfo("  Camera ID: " .. ptzMessage.camera_id)
        api.logging.LogInfo("  Parameters: " .. (ptzMessage.parameters or "None"))
    end

    -- Check for system performance metrics
    local perfMessage = topbus:receiveMessage("system/performance-metrics")
    if perfMessage then
        api.logging.LogInfo("Performance metrics:")
        api.logging.LogInfo("  Throughput: " .. perfMessage.throughput .. " msg/sec")
        api.logging.LogInfo("  Latency: " .. perfMessage.latency .. "ms")
        api.logging.LogInfo("  Memory Usage: " .. perfMessage.memory_usage .. "MB")
    end
end

-- Send high-performance status updates
function sendHighPerfStatus(status)
    local statusMessage = {
        timestamp = os.time(),
        component = "analytics-engine",
        instance_id = instance:getName(),
        status = status,
        performance_metrics = {
            -- fps = status.fps,  -- Use from provided status
            -- memory_usage = status.memory_usage,  -- Use from provided status
            -- cpu_usage = status.cpu_usage,  -- Use from provided status
            shared_memory_usage = topbus:getSharedMemoryUsage(),
            message_throughput = topbus:getMessageThroughput()
        }
    }

    -- Use shared memory for high-performance transmission
    topbus:sendMessage("system/performance-status", api.cvalue.create(statusMessage), true)
end

-- Broadcast high-priority alerts with shared memory
function broadcastHighPriorityAlert(alertType, message)
    local alert = {
        timestamp = os.time(),
        type = alertType,
        source = instance:getName(),
        message = message,
        priority = "critical",
        requires_immediate_action = true
    }

    -- Use shared memory for fastest delivery
    topbus:broadcast(api.cvalue.create(alert), true)
end

-- Main high-performance processing loop
function onTick()
    processTopBusMessages()

    -- Send performance status updates every 30 seconds
    if os.time() % 30 == 0 then
        sendHighPerfStatus("running")
    end

    -- Monitor performance and alert if degraded
    local throughput = topbus:getMessageThroughput()
    if throughput < 1000 then  -- Below expected threshold
        broadcastHighPriorityAlert("performance_degradation", "TopBus throughput below threshold: " .. throughput .. " msg/sec")
    end
end

Best Practices

High-Performance Architecture

  1. Shared Memory Utilization:

    • Enable shared memory for high-throughput scenarios
    • Configure appropriate shared memory segment sizes
    • Monitor shared memory usage and fragmentation
    • Implement proper shared memory cleanup
  2. Performance Mode Selection:

    • Use "maximum" performance mode for critical applications
    • Use "balanced" for general-purpose deployments
    • Use "efficient" for resource-constrained environments
    • Monitor performance impact of different modes
  3. Message Optimization:

    • Enable message compression for large data volumes
    • Use priority queues for critical message types
    • Implement message filtering to reduce unnecessary traffic
    • Monitor message queue sizes and processing rates

Sink Selection and Optimization

  1. Event Distribution:

    • Use eventsExport for high-throughput analytics events
    • Enable VANA format for advanced analytics integration
    • Configure appropriate buffer sizes for event volume
    • Monitor event processing latency
  2. Tracking Management:

    • Use locked_tracks for persistent tracking data
    • Use object_left_removed_motion_tracks for movement events
    • Enable tracking optimization for better performance
    • Implement track state management
  3. PTZ Integration:

    • Use ptz_idle_request for PTZ camera control
    • Enable idle detection for AI coordination
    • Implement real-time PTZ control
    • Monitor PTZ status and responsiveness

Resource Management

  1. Memory Management:

    • Configure shared memory sizes based on data volume
    • Monitor memory usage and implement cleanup
    • Use appropriate buffer sizes for different workloads
    • Implement memory pressure handling
  2. CPU Optimization:

    • Monitor message processing CPU usage
    • Use multi-threading for parallel processing
    • Implement load balancing for multiple handlers
    • Optimize message routing algorithms
  3. System Integration:

    • Coordinate with system resource management
    • Implement graceful degradation under load
    • Monitor system performance impact
    • Use resource limits and quotas

Troubleshooting

High-Performance Issues

  1. Low message throughput

    • Cause: Suboptimal configuration or system constraints
    • Solution: Enable shared memory, increase buffer sizes, optimize performance mode
    • Monitor: Message throughput rates, queue sizes, processing latency
    • Optimize: Performance mode settings, shared memory configuration
  2. High memory usage

    • Cause: Large shared memory segments or buffer overflow
    • Solution: Optimize shared memory sizes, implement cleanup
    • Monitor: Shared memory usage, buffer utilization, memory fragmentation
    • Check: Memory leaks, cleanup procedures, segment management
  3. Message processing latency

    • Cause: Processing bottlenecks or system load
    • Solution: Enable priority queues, optimize message processing
    • Monitor: Processing times, queue depths, system load
    • Optimize: Priority settings, processing algorithms, resource allocation

Shared Memory Issues

  1. Shared memory creation fails

    • Cause: Insufficient system resources or permissions
    • Solution: Check system limits, adjust permissions
    • Check: System shared memory limits, user permissions, available memory
    • Debug: System logs, shared memory status, resource usage
  2. Shared memory corruption

    • Cause: Concurrent access issues or improper cleanup
    • Solution: Implement proper synchronization, cleanup procedures
    • Monitor: Data integrity, access patterns, corruption events
    • Debug: Memory access logs, synchronization issues

PTZ Integration Issues

  1. PTZ control not responding

    • Cause: Communication issues or configuration problems
    • Solution: Check PTZ connectivity, verify configuration
    • Monitor: PTZ command response times, connection status
    • Debug: PTZ communication logs, command execution status
  2. Idle detection not working

    • Cause: PTZ status updates not received or processing issues
    • Solution: Verify PTZ status reporting, check processing logic
    • Monitor: PTZ status updates, idle detection events
    • Debug: Status message flow, idle detection logic

VANA Integration Issues

  1. VANA data format errors

    • Cause: Data structure incompatibilities or conversion issues
    • Solution: Verify VANA format requirements, update conversion logic
    • Check: Data format specifications, conversion algorithms
    • Debug: VANA data validation, format conversion logs
  2. Performance degradation with VANA

    • Cause: Complex data processing or large data volumes
    • Solution: Optimize VANA processing, enable shared memory
    • Monitor: VANA processing times, data throughput
    • Optimize: Processing algorithms, memory usage, data flow

Integration Examples

Enterprise Surveillance System

{
  "output": {
    "handlers": {
      "enterprise-analytics": {
        "uri": "topbus://enterprise-surveillance",
        "sink": "eventsExport",
        "enabled": true,
        "config": {
          "messageType": "enterprise_surveillance",
          "useSharedMemory": true,
          "bufferSize": 15000,
          "sharedMemorySize": "512MB",
          "enableVanaFormat": true,
          "performanceMode": "maximum",
          "messageCompression": true,
          "priorityQueue": true,
          "eventFiltering": {
            "enabled": true,
            "allowedTypes": ["intrusion", "unauthorized_access", "crowd_detection", "abandoned_object"],
            "minConfidence": 0.85
          }
        }
      },
      "ptz-coordination": {
        "uri": "topbus://enterprise-ptz",
        "sink": "ptz_idle_request",
        "enabled": true,
        "config": {
          "messageType": "ptz_enterprise",
          "ptzEnabled": true,
          "idleDetection": true,
          "realTimeControl": true,
          "enterpriseCoordination": true,
          "autoTracking": true
        }
      }
    }
  }
}

Real-Time Manufacturing Monitoring

{
  "output": {
    "handlers": {
      "manufacturing-monitor": {
        "uri": "topbus://manufacturing-realtime",
        "sink": "eventsExport", 
        "enabled": true,
        "config": {
          "messageType": "manufacturing_monitoring",
          "useSharedMemory": true,
          "bufferSize": 8000,
          "sharedMemorySize": "256MB",
          "performanceMode": "maximum",
          "realTimeOptimized": true,
          "eventFiltering": {
            "allowedTypes": ["quality_issue", "safety_violation", "production_anomaly"],
            "realTimeOnly": true
          }
        }
      }
    }
  }
}

See Also