Skip to content

NBus Plugin

Description

The NBus plugin provides internal message bus communication capabilities through the Output plugin system. It registers the nbus:// URI scheme and serves as a specialized output handler for inter-process communication, event distribution, and data exchange within CVEDIA-RT components. NBus acts as a centralized messaging system that enables different modules and instances to communicate efficiently.

Key Features

  • Internal Message Bus: Centralized communication hub for CVEDIA-RT components
  • Output Handler Integration: Registered nbus:// URI scheme with Output plugin
  • Multi-Sink Support: Handles various data types (source_info, eventsExport, locked_tracks, etc.)
  • Event Distribution: Efficient event routing between system components
  • VANA Data Structure: Integration with VANA (Video Analytics) data formats
  • Track Management: Specialized handling of tracking data and movement events
  • Frame Processing Coordination: Synchronization of frame processing workflows
  • Singleton Architecture: Centralized NBusCore instance for system-wide coordination

When to Use

  • Inter-process communication within CVEDIA-RT ecosystem
  • Event distribution between different system components
  • Coordination of tracking and analytics workflows
  • Integration with VANA (Video Analytics) systems
  • Centralized data exchange for distributed processing
  • System-wide event notification and coordination
  • Specialized messaging for internal CVEDIA-RT operations

Requirements

Software Dependencies

  • CVEDIA-RT core system
  • NBus core libraries and messaging infrastructure
  • VANA data structure support (if using analytics features)

System Requirements

  • Sufficient memory for message queuing and buffering
  • CPU resources for message routing and processing
  • Inter-process communication capabilities

Configuration

The NBus plugin is configured through the Output plugin system using the nbus:// URI scheme.

Basic Configuration

{
  "output": {
    "handlers": {
      "nbus-events": {
        "uri": "nbus://events",
        "sink": "eventsExport",
        "enabled": true,
        "config": {
          "messageType": "events",
          "bufferSize": 1000
        }
      }
    }
  }
}

Multi-Sink Configuration

{
  "output": {
    "handlers": {
      "nbus-source-info": {
        "uri": "nbus://source-info",
        "sink": "source_info",
        "enabled": true,
        "config": {
          "messageType": "source_info"
        }
      },
      "nbus-events-export": {
        "uri": "nbus://events-export",
        "sink": "eventsExport",
        "enabled": true,
        "config": {
          "messageType": "events"
        }
      },
      "nbus-locked-tracks": {
        "uri": "nbus://locked-tracks",
        "sink": "locked_tracks",
        "enabled": true,
        "config": {
          "messageType": "tracks"
        }
      },
      "nbus-frame-processed": {
        "uri": "nbus://frame-processed",
        "sink": "frame_processed",
        "enabled": true,
        "config": {
          "messageType": "frame_sync"
        }
      }
    }
  }
}

Advanced Configuration

{
  "output": {
    "handlers": {
      "nbus-analytics": {
        "uri": "nbus://analytics-hub",
        "sink": "eventsExport",
        "enabled": true,
        "config": {
          "messageType": "analytics",
          "bufferSize": 2000,
          "queueTimeout": 1000,
          "enableVanaFormat": true,
          "trackingEnabled": true,
          "eventFiltering": {
            "enabled": true,
            "allowedTypes": ["intrusion", "line_crossing", "object_detection"]
          }
        }
      }
    }
  }
}

Configuration Schema

Parameter Type Default Description
uri string required NBus URI (nbus://path)
sink string required Specific sink type for NBus processing
enabled boolean true Enable/disable NBus output
config.messageType string "generic" Type of messages to handle
config.bufferSize integer 1000 Message buffer size
config.queueTimeout integer 1000 Queue timeout in milliseconds
config.enableVanaFormat boolean false Enable VANA data structure format
config.trackingEnabled boolean true Enable tracking data processing

URI Scheme

The NBus plugin registers the nbus:// URI scheme with the Output plugin system.

URI Format

nbus://[path][?options]

Components: - path: Optional path identifier for message routing - options: URL-encoded query parameters for configuration

URI Examples

nbus://events                    # Basic event messaging
nbus://analytics-hub             # Analytics data hub
nbus://tracking-coordinator      # Tracking coordination
nbus://vana-integration         # VANA system integration

Supported Sink Types

The NBus plugin handles specific sink types with specialized processing:

Source Information (source_info)

  • Purpose: Initialize or update triggers based on source data
  • Processing: Handles source configuration and trigger setup
  • Data Format: Source metadata and configuration information

Events Export (eventsExport)

  • Purpose: Process analytics events for distribution
  • Processing: Prepares VANA data structure for event export
  • Data Format: Structured analytics events and detections

Locked Tracks (locked_tracks)

  • Purpose: Manage persistent tracking data
  • Processing: Adds locked tracks to VANA data structure
  • Data Format: Track objects with locked/confirmed status

Movement Tracks (object_left_removed_motion_tracks)

  • Purpose: Handle object movement and state changes
  • Processing: Tracks from object left/removed triggers
  • Data Format: Movement tracking data and object state changes

Frame Processed (frame_processed)

  • Purpose: Coordinate frame processing completion
  • Processing: Processes VANA data and clears temporary data
  • Data Format: Frame processing status and cleanup signals

API Reference

C++ API

The NBus plugin implements the iface::OutputHandler interface with specialized message bus functionality:

class NBusOutputHandler : public iface::OutputHandler {
public:
    // Constructor
    NBusOutputHandler(const std::string& moduleName,
                     const std::string& schema, 
                     const std::string& sink,
                     const std::string& path, 
                     const pCValue& config);

    // Factory method (registered with Output plugin)
    static std::shared_ptr<iface::OutputHandler> create(
        const std::string& moduleName,
        const std::string& schema,
        const std::string& sink, 
        const std::string& path,
        pCValue config);

    // Message bus operations
    expected<bool> write(pCValue sinkData = VAL(), 
                        std::string dynamicPath = "") override;
    void stop() override;
    void close() override;

    // Handler information
    std::string getSink() override;
};

NBus Core Classes

// Singleton message bus core
class NBusCore {
public:
    // Singleton access
    static NBusCore* getInstance();

    // Sink-specific handlers
    expected<bool> handleSourceSink(pCValue sinkData);
    expected<bool> handleEventsExportSink(pCValue sinkData);
    expected<bool> handleLockedTracksSink(pCValue sinkData);
    expected<bool> handleMovementTracksSink(pCValue sinkData);
    expected<bool> handleFrameProcessedSink(pCValue sinkData);

    // Message bus operations
    bool publishMessage(const std::string& topic, pCValue data);
    bool subscribeToTopic(const std::string& topic, MessageHandler handler);
    void unsubscribeFromTopic(const std::string& topic);

    // VANA integration
    bool prepareVanaData(pCValue eventData);
    void clearTemporaryData();
};

// Managed NBus interface
class NBusManaged : public iface::NBus {
public:
    // Factory method
    static std::unique_ptr<iface::NBus> create(const std::string& moduleName);

    // Message operations
    bool sendMessage(const std::string& destination, pCValue message);
    pCValue receiveMessage(const std::string& source);
    bool broadcast(pCValue message);

    // Subscription management
    bool subscribe(const std::string& topic);
    bool unsubscribe(const std::string& topic);
};

Lua API

The NBus plugin is accessed through the Output plugin system and standalone factory:

-- Via Output plugin system (recommended for data distribution)
local instance = api.thread.getCurrentInstance()
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")

-- Add NBus output handler for events
local nbusHandler = output:addHandler(
    "nbus-events",
    "nbus://events-hub",
    "eventsExport",
    {
        messageType = "analytics",
        bufferSize = 1000
    }
)

-- Add NBus handler for tracking data
local trackingHandler = output:addHandler(
    "nbus-tracking",
    "nbus://tracking-coordinator",
    "locked_tracks",
    {
        messageType = "tracks",
        trackingEnabled = true
    }
)

-- Note: Standalone NBus factory may not be available in all installations
-- NBus is primarily used through the Output plugin system
-- If available in your installation:
-- local nbus = api.factory.nbus.create("NBus")

-- Message operations
nbus:sendMessage("destination", messageData)
local receivedMessage = nbus:receiveMessage("source")
nbus:broadcast(broadcastData)

-- Subscription management
nbus:subscribe("analytics/events")
nbus:unsubscribe("analytics/events")

Note: NBus operates as both an output handler integrated with the Output plugin system and a standalone messaging interface for direct inter-process communication.

Examples

Basic Event Distribution

-- Create output instance for NBus event distribution
local instance = api.thread.getCurrentInstance()
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")

-- Configure NBus for analytics events
local nbusConfig = {
    messageType = "analytics",
    bufferSize = 1000,
    enableVanaFormat = true
}

local eventHandler = output:addHandler(
    "nbus-analytics",
    "nbus://analytics-hub",
    "eventsExport",
    nbusConfig
)

if eventHandler then
    print("NBus event distribution configured")
    print("Hub: analytics-hub")
    print("Sink: eventsExport")
    print("VANA format enabled")

    -- Analytics events will be automatically distributed through NBus
    -- to other CVEDIA-RT components subscribing to the analytics hub
else
    print("Failed to create NBus event handler")
end

Multi-Component Coordination

local instance = api.thread.getCurrentInstance()
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")

-- Configure multiple NBus handlers for different data types
local nbusHandlers = {
    {
        name = "source-coordinator",
        uri = "nbus://source-info",
        sink = "source_info",
        config = {
            messageType = "source_management",
            bufferSize = 500
        }
    },
    {
        name = "event-distributor",
        uri = "nbus://events-hub", 
        sink = "eventsExport",
        config = {
            messageType = "analytics",
            bufferSize = 2000,
            enableVanaFormat = true
        }
    },
    {
        name = "tracking-manager",
        uri = "nbus://tracking-coordinator",
        sink = "locked_tracks",
        config = {
            messageType = "tracking",
            trackingEnabled = true,
            bufferSize = 1500
        }
    },
    {
        name = "frame-processor",
        uri = "nbus://frame-sync",
        sink = "frame_processed",
        config = {
            messageType = "frame_sync",
            queueTimeout = 500
        }
    }
}

-- Create all NBus coordination handlers
local activeHandlers = {}
for _, handler in ipairs(nbusHandlers) do
    local nbusHandler = output:addHandler(
        handler.name,
        handler.uri,
        handler.sink,
        handler.config
    )

    if nbusHandler then
        table.insert(activeHandlers, handler.name)
        print("NBus handler created: " .. handler.name)
        print("  URI: " .. handler.uri)
        print("  Sink: " .. handler.sink)
        print("  Type: " .. handler.config.messageType)
    else
        print("Failed to create NBus handler: " .. handler.name)
    end
end

print("Active NBus coordination handlers: " .. #activeHandlers)

VANA Integration

local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")

-- Configure NBus for VANA system integration
local vanaConfig = {
    messageType = "vana_analytics",
    bufferSize = 3000,
    enableVanaFormat = true,
    trackingEnabled = true,
    eventFiltering = {
        enabled = true,
        allowedTypes = {"intrusion", "line_crossing", "object_detection", "abandoned_object"}
    }
}

local vanaHandler = output:addHandler(
    "vana-integration",
    "nbus://vana-hub",
    "eventsExport",
    vanaConfig
)

if vanaHandler then
    print("VANA integration configured through NBus")
    print("Event filtering enabled")
    print("Tracking integration active")
    print("VANA data format enabled")

    -- Configure additional handlers for VANA workflow
    local vanaTrackingHandler = output:addHandler(
        "vana-tracking",
        "nbus://vana-tracking",
        "locked_tracks",
        {
            messageType = "vana_tracking",
            enableVanaFormat = true
        }
    )

    local vanaMovementHandler = output:addHandler(
        "vana-movement",
        "nbus://vana-movement",
        "object_left_removed_motion_tracks", 
        {
            messageType = "vana_movement",
            enableVanaFormat = true
        }
    )

    print("Complete VANA workflow integration established")
end

Inter-Instance Communication

-- Configure NBus for communication between CVEDIA-RT instances
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")

-- Get current instance information
local currentInstanceId = instance:getInstanceId()
local instanceName = instance:getName()

-- Configure NBus handlers for inter-instance communication
local interInstanceConfig = {
    messageType = "inter_instance",
    bufferSize = 1000,
    sourceInstanceId = currentInstanceId,
    sourceInstanceName = instanceName
}

local commHandler = output:addHandler(
    "inter-instance-comm",
    "nbus://inter-instance-hub",
    "eventsExport",
    interInstanceConfig
)

-- Also configure for receiving coordination messages
local coordHandler = output:addHandler(
    "coordination-handler",
    "nbus://coordination-hub",
    "source_info",
    {
        messageType = "coordination",
        targetInstance = "all"
    }
)

if commHandler and coordHandler then
    print("Inter-instance communication configured")
    print("Instance ID: " .. currentInstanceId)
    print("Instance Name: " .. instanceName)
    print("Communication hub: inter-instance-hub")
    print("Coordination hub: coordination-hub")
end

Standalone NBus Messaging

-- Note: Direct NBus messaging requires NBus factory availability
-- This example assumes NBus factory is available:
-- local nbus = api.factory.nbus.create("NBus")

-- Subscribe to analytics events from other components
nbus:subscribe("analytics/events")
nbus:subscribe("tracking/updates")
nbus:subscribe("system/status")

-- Message processing function
function processNBusMessages()
    -- Check for analytics events
    local analyticsMessage = nbus:receiveMessage("analytics/events")
    if analyticsMessage then
        print("Received analytics event:")
        print("  Type: " .. analyticsMessage.type)
        print("  Confidence: " .. analyticsMessage.confidence)
    end

    -- Check for tracking updates
    local trackingMessage = nbus:receiveMessage("tracking/updates")
    if trackingMessage then
        print("Received tracking update:")
        print("  Track ID: " .. trackingMessage.track_id)
        print("  Status: " .. trackingMessage.status)
    end

    -- Check for system status
    local statusMessage = nbus:receiveMessage("system/status")
    if statusMessage then
        print("System status update:")
        print("  Component: " .. statusMessage.component)
        print("  Status: " .. statusMessage.status)
    end
end

-- Send status updates
function sendSystemStatus(status)
    local statusMessage = {
        timestamp = os.time(),
        component = "analytics-engine",
        instance_id = instance:getInstanceId(),
        status = status,
        details = {
            -- Add actual metrics based on your implementation
            -- fps would come from input:getFPS() or similar
            -- System metrics require platform-specific implementation
        }
    }

    nbus:sendMessage("system/status", statusMessage)
end

-- Broadcast important notifications
function broadcastAlert(alertType, message)
    local alert = {
        timestamp = os.time(),
        type = alertType,
        source = instance:getName(),
        message = message,
        priority = "high"
    }

    nbus:broadcast(alert)
end

-- Main processing loop
function onTick()
    processNBusMessages()

    -- Send periodic status updates
    if os.time() % 60 == 0 then  -- Every minute
        sendSystemStatus("running")
    end
end

Event Filtering and Routing

local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")

-- Configure NBus with advanced event filtering
local routingConfigs = {
    {
        name = "critical-events",
        uri = "nbus://critical-alerts",
        sink = "eventsExport",
        config = {
            messageType = "critical",
            bufferSize = 500,
            eventFiltering = {
                enabled = true,
                allowedTypes = {"intrusion", "unauthorized_access"},
                minConfidence = 0.9,
                priority = "high"
            }
        }
    },
    {
        name = "general-analytics",
        uri = "nbus://analytics-stream",
        sink = "eventsExport", 
        config = {
            messageType = "analytics",
            bufferSize = 2000,
            eventFiltering = {
                enabled = true,
                allowedTypes = {"object_detection", "line_crossing", "loitering"},
                minConfidence = 0.7,
                priority = "medium"
            }
        }
    },
    {
        name = "tracking-events",
        uri = "nbus://tracking-stream",
        sink = "locked_tracks",
        config = {
            messageType = "tracking",
            bufferSize = 1000,
            trackingEnabled = true,
            eventFiltering = {
                enabled = true,
                trackingStates = ["active", "confirmed", "lost"],
                minTrackDuration = 2.0
            }
        }
    }
}

-- Create filtered event routing
local routingHandlers = {}
for _, config in ipairs(routingConfigs) do
    local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
    if handler then
        table.insert(routingHandlers, config.name)
        print("Event routing configured: " .. config.name)
        print("  Filter: " .. config.config.messageType)
        if config.config.eventFiltering and config.config.eventFiltering.allowedTypes then
            print("  Types: " .. table.concat(config.config.eventFiltering.allowedTypes, ", "))
        end
    end
end

print("NBus event routing active: " .. #routingHandlers .. " streams")

Best Practices

Message Bus Architecture

  1. Centralized Coordination:

    • Use NBus as central hub for inter-component communication
    • Implement clear message routing patterns
    • Design for scalability and component isolation
    • Plan message flow and dependencies
  2. Sink Selection:

    • Use appropriate sink types for different data categories
    • eventsExport for analytics events and detections
    • locked_tracks for persistent tracking data
    • source_info for configuration and source management
    • frame_processed for workflow synchronization
  3. VANA Integration:

    • Enable VANA format when integrating with Video Analytics systems
    • Use proper data structure conversion
    • Implement error handling for VANA data processing
    • Monitor VANA data flow and formatting

Performance Optimization

  1. Buffer Management:

    • Configure appropriate buffer sizes for message volume
    • Monitor memory usage for high-throughput scenarios
    • Implement buffer overflow handling
    • Use queue timeouts to prevent blocking
  2. Message Filtering:

    • Filter events at the source to reduce NBus load
    • Use confidence thresholds for quality control
    • Implement priority-based message routing
    • Remove unnecessary data before transmission
  3. Singleton Usage:

    • NBusCore uses singleton pattern for system-wide coordination
    • Avoid creating multiple NBus instances unnecessarily
    • Leverage shared NBus instance for efficient resource usage
    • Monitor singleton resource consumption

Error Handling

  1. Message Delivery:

    • NBus provides internal message reliability
    • Implement application-level acknowledgments if needed
    • Monitor message queue status and overflow conditions
    • Handle message processing failures gracefully
  2. Component Failures:

    • Design for component failure resilience
    • Implement timeout mechanisms for message processing
    • Use retry logic for critical message delivery
    • Monitor component health through NBus
  3. Resource Management:

    • Monitor NBus core resource usage
    • Implement proper cleanup for message handlers
    • Handle memory pressure and queue management
    • Monitor system performance impact

Troubleshooting

Handler Registration Issues

  1. "Cannot register NBusOutputHandler for schema" error

    • Cause: Invalid URI scheme or configuration
    • Solution: Verify URI uses nbus:// scheme correctly
    • Check: URI format, scheme validation, path specification
  2. Handler creation fails

    • Cause: NBusCore initialization issues or resource constraints
    • Solution: Check system resources and NBus core status
    • Check: Log messages, system memory, NBus singleton status
  3. Sink type not recognized

    • Cause: Using unsupported sink type with NBus handler
    • Solution: Use supported sink types (source_info, eventsExport, etc.)
    • Check: Sink name spelling, supported sink list

Message Flow Issues

  1. Messages not being distributed

    • Cause: NBus core not processing messages or routing issues
    • Solution: Check NBus core status and message routing
    • Debug: Enable NBus logging, monitor message queues
    • Check: Message format, routing configuration, subscriber status
  2. High message latency

    • Cause: Buffer overflow, processing bottlenecks, or system load
    • Solution: Increase buffer sizes, optimize message processing
    • Monitor: Queue sizes, processing times, system resources
    • Optimize: Message filtering, buffer configuration, processing logic
  3. Message loss or corruption

    • Cause: Buffer overflow, memory pressure, or processing errors
    • Solution: Implement proper error handling and monitoring
    • Check: Memory usage, buffer sizes, error logs
    • Monitor: Message delivery rates, error frequencies

VANA Integration Issues

  1. VANA data format errors

    • Cause: Incorrect data structure or format conversion issues
    • Solution: Verify VANA format requirements and data structure
    • Check: Data format specification, conversion logic, field mapping
  2. Tracking data inconsistencies

    • Cause: Track state management or data synchronization issues
    • Solution: Review tracking workflow and data flow
    • Monitor: Track states, data consistency, synchronization points
  3. Frame processing synchronization

    • Cause: Frame processing timing or coordination issues
    • Solution: Check frame processing workflow and timing
    • Debug: Frame processing logs, timing analysis, workflow status

Performance Issues

  1. High memory usage

    • Cause: Large message buffers or memory leaks
    • Solution: Optimize buffer sizes, implement cleanup
    • Monitor: Memory consumption, buffer utilization, garbage collection
    • Optimize: Buffer configuration, message filtering, cleanup logic
  2. CPU utilization

    • Cause: Intensive message processing or routing overhead
    • Solution: Optimize message processing logic and routing
    • Profile: CPU usage patterns, processing bottlenecks
    • Optimize: Message filtering, processing algorithms, routing efficiency
  3. System responsiveness

    • Cause: NBus blocking operations or resource contention
    • Solution: Implement non-blocking operations and resource management
    • Monitor: System responsiveness, blocking operations, resource contention

Debugging Tools

  1. NBus Status Monitoring:

    -- Check NBus core status (if NBus factory is available)
    -- local nbus = api.factory.nbus.create("NBus")
    
    -- Monitor message flow
    function monitorNBusStatus()
        -- Check if NBus core is active
        local coreStatus = NBusCore.getInstance():getStatus()
        print("NBus Core Status: " .. coreStatus)
    
        -- Monitor message queues
        local queueStatus = NBusCore.getInstance():getQueueStatus()
        for queue, status in pairs(queueStatus) do
            print("Queue " .. queue .. ": " .. status.size .. " messages")
        end
    end
    

  2. Message Flow Analysis:

    -- Enable NBus debug logging
    function enableNBusDebug()
        NBusCore.getInstance():setLogLevel("DEBUG")
        print("NBus debug logging enabled")
    end
    
    -- Monitor specific message types
    function monitorMessageTypes()
        -- If NBus factory is available:
        -- local nbus = api.factory.nbus.create("NBus")
        nbus:subscribe("analytics/events")
    
        -- Count messages by type
        local messageCounts = {}
        local message = nbus:receiveMessage("analytics/events")
        if message then
            local msgType = message.type or "unknown"
            messageCounts[msgType] = (messageCounts[msgType] or 0) + 1
        end
    
        -- Report statistics
        for msgType, count in pairs(messageCounts) do
            print("Message type " .. msgType .. ": " .. count .. " received")
        end
    end
    

  3. System Integration Testing: ```lua -- Test NBus integration function testNBusIntegration() local instance = api.thread.getCurrentInstance() local output = api.factory.output.create(instance, "Output")

    -- Test handler creation
    local testHandler = output:addHandler(
        "test-handler",
        "nbus://test-hub",
        "eventsExport",
        {messageType = "test"}
    )
    
    if testHandler then
        print("✓ NBus handler creation successful")
    else
        print("✗ NBus handler creation failed")
    end
    
    -- Test message flow
    local testMessage = {
        type = "test",
        timestamp = os.time(),
        data = "NBus integration test"
    }
    
    -- Simulate message processing
    print("Testing NBus message flow...")
    

    end ```

Integration Examples

Multi-Instance Coordination

-- Configure NBus for coordinating multiple CVEDIA-RT instances
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")

-- Instance coordination configuration
local coordinationConfig = {
    messageType = "coordination",
    bufferSize = 1000,
    instanceId = instance:getInstanceId(),
    coordinationRole = "primary",  -- or "secondary"
    heartbeatInterval = 30
}

local coordHandler = output:addHandler(
    "instance-coordination",
    "nbus://multi-instance-coord",
    "source_info",
    coordinationConfig
)

-- Event sharing between instances
local eventSharingHandler = output:addHandler(
    "event-sharing",
    "nbus://shared-events",
    "eventsExport",
    {
        messageType = "shared_events",
        shareWithInstances = {"instance-2", "instance-3"},
        eventTypes = {"critical", "high_priority"}
    }
)

Custom Analytics Pipeline

-- NBus integration for custom analytics pipeline
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")

-- Configure NBus for analytics stages
local pipelineStages = {
    {
        name = "preprocessing",
        uri = "nbus://analytics-preprocessing",
        sink = "source_info",
        config = {messageType = "preprocessing", stage = 1}
    },
    {
        name = "detection",
        uri = "nbus://analytics-detection", 
        sink = "eventsExport",
        config = {messageType = "detection", stage = 2}
    },
    {
        name = "tracking",
        uri = "nbus://analytics-tracking",
        sink = "locked_tracks",
        config = {messageType = "tracking", stage = 3}
    },
    {
        name = "postprocessing",
        uri = "nbus://analytics-postprocessing",
        sink = "frame_processed",
        config = {messageType = "postprocessing", stage = 4}
    }
}

-- Create pipeline coordination
for _, stage in ipairs(pipelineStages) do
    local handler = output:addHandler(stage.name, stage.uri, stage.sink, stage.config)
    if handler then
        print("Analytics pipeline stage configured: " .. stage.name)
    end
end

See Also