NBus Plugin¶
Description¶
The NBus plugin provides internal message bus communication capabilities through the Output plugin system. It registers the nbus://
URI scheme and serves as a specialized output handler for inter-process communication, event distribution, and data exchange within CVEDIA-RT components. NBus acts as a centralized messaging system that enables different modules and instances to communicate efficiently.
Key Features¶
- Internal Message Bus: Centralized communication hub for CVEDIA-RT components
- Output Handler Integration: Registered
nbus://
URI scheme with Output plugin - Multi-Sink Support: Handles various data types (source_info, eventsExport, locked_tracks, etc.)
- Event Distribution: Efficient event routing between system components
- VANA Data Structure: Integration with VANA (Video Analytics) data formats
- Track Management: Specialized handling of tracking data and movement events
- Frame Processing Coordination: Synchronization of frame processing workflows
- Singleton Architecture: Centralized NBusCore instance for system-wide coordination
When to Use¶
- Inter-process communication within CVEDIA-RT ecosystem
- Event distribution between different system components
- Coordination of tracking and analytics workflows
- Integration with VANA (Video Analytics) systems
- Centralized data exchange for distributed processing
- System-wide event notification and coordination
- Specialized messaging for internal CVEDIA-RT operations
Requirements¶
Software Dependencies¶
- CVEDIA-RT core system
- NBus core libraries and messaging infrastructure
- VANA data structure support (if using analytics features)
System Requirements¶
- Sufficient memory for message queuing and buffering
- CPU resources for message routing and processing
- Inter-process communication capabilities
Configuration¶
The NBus plugin is configured through the Output plugin system using the nbus://
URI scheme.
Basic Configuration¶
{
"output": {
"handlers": {
"nbus-events": {
"uri": "nbus://events",
"sink": "eventsExport",
"enabled": true,
"config": {
"messageType": "events",
"bufferSize": 1000
}
}
}
}
}
Multi-Sink Configuration¶
{
"output": {
"handlers": {
"nbus-source-info": {
"uri": "nbus://source-info",
"sink": "source_info",
"enabled": true,
"config": {
"messageType": "source_info"
}
},
"nbus-events-export": {
"uri": "nbus://events-export",
"sink": "eventsExport",
"enabled": true,
"config": {
"messageType": "events"
}
},
"nbus-locked-tracks": {
"uri": "nbus://locked-tracks",
"sink": "locked_tracks",
"enabled": true,
"config": {
"messageType": "tracks"
}
},
"nbus-frame-processed": {
"uri": "nbus://frame-processed",
"sink": "frame_processed",
"enabled": true,
"config": {
"messageType": "frame_sync"
}
}
}
}
}
Advanced Configuration¶
{
"output": {
"handlers": {
"nbus-analytics": {
"uri": "nbus://analytics-hub",
"sink": "eventsExport",
"enabled": true,
"config": {
"messageType": "analytics",
"bufferSize": 2000,
"queueTimeout": 1000,
"enableVanaFormat": true,
"trackingEnabled": true,
"eventFiltering": {
"enabled": true,
"allowedTypes": ["intrusion", "line_crossing", "object_detection"]
}
}
}
}
}
}
Configuration Schema¶
Parameter | Type | Default | Description |
---|---|---|---|
uri |
string | required | NBus URI (nbus://path) |
sink |
string | required | Specific sink type for NBus processing |
enabled |
boolean | true | Enable/disable NBus output |
config.messageType |
string | "generic" | Type of messages to handle |
config.bufferSize |
integer | 1000 | Message buffer size |
config.queueTimeout |
integer | 1000 | Queue timeout in milliseconds |
config.enableVanaFormat |
boolean | false | Enable VANA data structure format |
config.trackingEnabled |
boolean | true | Enable tracking data processing |
URI Scheme¶
The NBus plugin registers the nbus://
URI scheme with the Output plugin system.
URI Format¶
nbus://[path][?options]
Components: - path: Optional path identifier for message routing - options: URL-encoded query parameters for configuration
URI Examples¶
nbus://events # Basic event messaging
nbus://analytics-hub # Analytics data hub
nbus://tracking-coordinator # Tracking coordination
nbus://vana-integration # VANA system integration
Supported Sink Types¶
The NBus plugin handles specific sink types with specialized processing:
Source Information (source_info
)¶
- Purpose: Initialize or update triggers based on source data
- Processing: Handles source configuration and trigger setup
- Data Format: Source metadata and configuration information
Events Export (eventsExport
)¶
- Purpose: Process analytics events for distribution
- Processing: Prepares VANA data structure for event export
- Data Format: Structured analytics events and detections
Locked Tracks (locked_tracks
)¶
- Purpose: Manage persistent tracking data
- Processing: Adds locked tracks to VANA data structure
- Data Format: Track objects with locked/confirmed status
Movement Tracks (object_left_removed_motion_tracks
)¶
- Purpose: Handle object movement and state changes
- Processing: Tracks from object left/removed triggers
- Data Format: Movement tracking data and object state changes
Frame Processed (frame_processed
)¶
- Purpose: Coordinate frame processing completion
- Processing: Processes VANA data and clears temporary data
- Data Format: Frame processing status and cleanup signals
API Reference¶
C++ API¶
The NBus plugin implements the iface::OutputHandler
interface with specialized message bus functionality:
class NBusOutputHandler : public iface::OutputHandler {
public:
// Constructor
NBusOutputHandler(const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
const pCValue& config);
// Factory method (registered with Output plugin)
static std::shared_ptr<iface::OutputHandler> create(
const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
pCValue config);
// Message bus operations
expected<bool> write(pCValue sinkData = VAL(),
std::string dynamicPath = "") override;
void stop() override;
void close() override;
// Handler information
std::string getSink() override;
};
NBus Core Classes¶
// Singleton message bus core
class NBusCore {
public:
// Singleton access
static NBusCore* getInstance();
// Sink-specific handlers
expected<bool> handleSourceSink(pCValue sinkData);
expected<bool> handleEventsExportSink(pCValue sinkData);
expected<bool> handleLockedTracksSink(pCValue sinkData);
expected<bool> handleMovementTracksSink(pCValue sinkData);
expected<bool> handleFrameProcessedSink(pCValue sinkData);
// Message bus operations
bool publishMessage(const std::string& topic, pCValue data);
bool subscribeToTopic(const std::string& topic, MessageHandler handler);
void unsubscribeFromTopic(const std::string& topic);
// VANA integration
bool prepareVanaData(pCValue eventData);
void clearTemporaryData();
};
// Managed NBus interface
class NBusManaged : public iface::NBus {
public:
// Factory method
static std::unique_ptr<iface::NBus> create(const std::string& moduleName);
// Message operations
bool sendMessage(const std::string& destination, pCValue message);
pCValue receiveMessage(const std::string& source);
bool broadcast(pCValue message);
// Subscription management
bool subscribe(const std::string& topic);
bool unsubscribe(const std::string& topic);
};
Lua API¶
The NBus plugin is accessed through the Output plugin system and standalone factory:
-- Via Output plugin system (recommended for data distribution)
local instance = api.thread.getCurrentInstance()
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")
-- Add NBus output handler for events
local nbusHandler = output:addHandler(
"nbus-events",
"nbus://events-hub",
"eventsExport",
{
messageType = "analytics",
bufferSize = 1000
}
)
-- Add NBus handler for tracking data
local trackingHandler = output:addHandler(
"nbus-tracking",
"nbus://tracking-coordinator",
"locked_tracks",
{
messageType = "tracks",
trackingEnabled = true
}
)
-- Note: Standalone NBus factory may not be available in all installations
-- NBus is primarily used through the Output plugin system
-- If available in your installation:
-- local nbus = api.factory.nbus.create("NBus")
-- Message operations
nbus:sendMessage("destination", messageData)
local receivedMessage = nbus:receiveMessage("source")
nbus:broadcast(broadcastData)
-- Subscription management
nbus:subscribe("analytics/events")
nbus:unsubscribe("analytics/events")
Note: NBus operates as both an output handler integrated with the Output plugin system and a standalone messaging interface for direct inter-process communication.
Examples¶
Basic Event Distribution¶
-- Create output instance for NBus event distribution
local instance = api.thread.getCurrentInstance()
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")
-- Configure NBus for analytics events
local nbusConfig = {
messageType = "analytics",
bufferSize = 1000,
enableVanaFormat = true
}
local eventHandler = output:addHandler(
"nbus-analytics",
"nbus://analytics-hub",
"eventsExport",
nbusConfig
)
if eventHandler then
print("NBus event distribution configured")
print("Hub: analytics-hub")
print("Sink: eventsExport")
print("VANA format enabled")
-- Analytics events will be automatically distributed through NBus
-- to other CVEDIA-RT components subscribing to the analytics hub
else
print("Failed to create NBus event handler")
end
Multi-Component Coordination¶
local instance = api.thread.getCurrentInstance()
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")
-- Configure multiple NBus handlers for different data types
local nbusHandlers = {
{
name = "source-coordinator",
uri = "nbus://source-info",
sink = "source_info",
config = {
messageType = "source_management",
bufferSize = 500
}
},
{
name = "event-distributor",
uri = "nbus://events-hub",
sink = "eventsExport",
config = {
messageType = "analytics",
bufferSize = 2000,
enableVanaFormat = true
}
},
{
name = "tracking-manager",
uri = "nbus://tracking-coordinator",
sink = "locked_tracks",
config = {
messageType = "tracking",
trackingEnabled = true,
bufferSize = 1500
}
},
{
name = "frame-processor",
uri = "nbus://frame-sync",
sink = "frame_processed",
config = {
messageType = "frame_sync",
queueTimeout = 500
}
}
}
-- Create all NBus coordination handlers
local activeHandlers = {}
for _, handler in ipairs(nbusHandlers) do
local nbusHandler = output:addHandler(
handler.name,
handler.uri,
handler.sink,
handler.config
)
if nbusHandler then
table.insert(activeHandlers, handler.name)
print("NBus handler created: " .. handler.name)
print(" URI: " .. handler.uri)
print(" Sink: " .. handler.sink)
print(" Type: " .. handler.config.messageType)
else
print("Failed to create NBus handler: " .. handler.name)
end
end
print("Active NBus coordination handlers: " .. #activeHandlers)
VANA Integration¶
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")
-- Configure NBus for VANA system integration
local vanaConfig = {
messageType = "vana_analytics",
bufferSize = 3000,
enableVanaFormat = true,
trackingEnabled = true,
eventFiltering = {
enabled = true,
allowedTypes = {"intrusion", "line_crossing", "object_detection", "abandoned_object"}
}
}
local vanaHandler = output:addHandler(
"vana-integration",
"nbus://vana-hub",
"eventsExport",
vanaConfig
)
if vanaHandler then
print("VANA integration configured through NBus")
print("Event filtering enabled")
print("Tracking integration active")
print("VANA data format enabled")
-- Configure additional handlers for VANA workflow
local vanaTrackingHandler = output:addHandler(
"vana-tracking",
"nbus://vana-tracking",
"locked_tracks",
{
messageType = "vana_tracking",
enableVanaFormat = true
}
)
local vanaMovementHandler = output:addHandler(
"vana-movement",
"nbus://vana-movement",
"object_left_removed_motion_tracks",
{
messageType = "vana_movement",
enableVanaFormat = true
}
)
print("Complete VANA workflow integration established")
end
Inter-Instance Communication¶
-- Configure NBus for communication between CVEDIA-RT instances
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")
-- Get current instance information
local currentInstanceId = instance:getInstanceId()
local instanceName = instance:getName()
-- Configure NBus handlers for inter-instance communication
local interInstanceConfig = {
messageType = "inter_instance",
bufferSize = 1000,
sourceInstanceId = currentInstanceId,
sourceInstanceName = instanceName
}
local commHandler = output:addHandler(
"inter-instance-comm",
"nbus://inter-instance-hub",
"eventsExport",
interInstanceConfig
)
-- Also configure for receiving coordination messages
local coordHandler = output:addHandler(
"coordination-handler",
"nbus://coordination-hub",
"source_info",
{
messageType = "coordination",
targetInstance = "all"
}
)
if commHandler and coordHandler then
print("Inter-instance communication configured")
print("Instance ID: " .. currentInstanceId)
print("Instance Name: " .. instanceName)
print("Communication hub: inter-instance-hub")
print("Coordination hub: coordination-hub")
end
Standalone NBus Messaging¶
-- Note: Direct NBus messaging requires NBus factory availability
-- This example assumes NBus factory is available:
-- local nbus = api.factory.nbus.create("NBus")
-- Subscribe to analytics events from other components
nbus:subscribe("analytics/events")
nbus:subscribe("tracking/updates")
nbus:subscribe("system/status")
-- Message processing function
function processNBusMessages()
-- Check for analytics events
local analyticsMessage = nbus:receiveMessage("analytics/events")
if analyticsMessage then
print("Received analytics event:")
print(" Type: " .. analyticsMessage.type)
print(" Confidence: " .. analyticsMessage.confidence)
end
-- Check for tracking updates
local trackingMessage = nbus:receiveMessage("tracking/updates")
if trackingMessage then
print("Received tracking update:")
print(" Track ID: " .. trackingMessage.track_id)
print(" Status: " .. trackingMessage.status)
end
-- Check for system status
local statusMessage = nbus:receiveMessage("system/status")
if statusMessage then
print("System status update:")
print(" Component: " .. statusMessage.component)
print(" Status: " .. statusMessage.status)
end
end
-- Send status updates
function sendSystemStatus(status)
local statusMessage = {
timestamp = os.time(),
component = "analytics-engine",
instance_id = instance:getInstanceId(),
status = status,
details = {
-- Add actual metrics based on your implementation
-- fps would come from input:getFPS() or similar
-- System metrics require platform-specific implementation
}
}
nbus:sendMessage("system/status", statusMessage)
end
-- Broadcast important notifications
function broadcastAlert(alertType, message)
local alert = {
timestamp = os.time(),
type = alertType,
source = instance:getName(),
message = message,
priority = "high"
}
nbus:broadcast(alert)
end
-- Main processing loop
function onTick()
processNBusMessages()
-- Send periodic status updates
if os.time() % 60 == 0 then -- Every minute
sendSystemStatus("running")
end
end
Event Filtering and Routing¶
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")
-- Configure NBus with advanced event filtering
local routingConfigs = {
{
name = "critical-events",
uri = "nbus://critical-alerts",
sink = "eventsExport",
config = {
messageType = "critical",
bufferSize = 500,
eventFiltering = {
enabled = true,
allowedTypes = {"intrusion", "unauthorized_access"},
minConfidence = 0.9,
priority = "high"
}
}
},
{
name = "general-analytics",
uri = "nbus://analytics-stream",
sink = "eventsExport",
config = {
messageType = "analytics",
bufferSize = 2000,
eventFiltering = {
enabled = true,
allowedTypes = {"object_detection", "line_crossing", "loitering"},
minConfidence = 0.7,
priority = "medium"
}
}
},
{
name = "tracking-events",
uri = "nbus://tracking-stream",
sink = "locked_tracks",
config = {
messageType = "tracking",
bufferSize = 1000,
trackingEnabled = true,
eventFiltering = {
enabled = true,
trackingStates = ["active", "confirmed", "lost"],
minTrackDuration = 2.0
}
}
}
}
-- Create filtered event routing
local routingHandlers = {}
for _, config in ipairs(routingConfigs) do
local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
if handler then
table.insert(routingHandlers, config.name)
print("Event routing configured: " .. config.name)
print(" Filter: " .. config.config.messageType)
if config.config.eventFiltering and config.config.eventFiltering.allowedTypes then
print(" Types: " .. table.concat(config.config.eventFiltering.allowedTypes, ", "))
end
end
end
print("NBus event routing active: " .. #routingHandlers .. " streams")
Best Practices¶
Message Bus Architecture¶
-
Centralized Coordination:
- Use NBus as central hub for inter-component communication
- Implement clear message routing patterns
- Design for scalability and component isolation
- Plan message flow and dependencies
-
Sink Selection:
- Use appropriate sink types for different data categories
eventsExport
for analytics events and detectionslocked_tracks
for persistent tracking datasource_info
for configuration and source managementframe_processed
for workflow synchronization
-
VANA Integration:
- Enable VANA format when integrating with Video Analytics systems
- Use proper data structure conversion
- Implement error handling for VANA data processing
- Monitor VANA data flow and formatting
Performance Optimization¶
-
Buffer Management:
- Configure appropriate buffer sizes for message volume
- Monitor memory usage for high-throughput scenarios
- Implement buffer overflow handling
- Use queue timeouts to prevent blocking
-
Message Filtering:
- Filter events at the source to reduce NBus load
- Use confidence thresholds for quality control
- Implement priority-based message routing
- Remove unnecessary data before transmission
-
Singleton Usage:
- NBusCore uses singleton pattern for system-wide coordination
- Avoid creating multiple NBus instances unnecessarily
- Leverage shared NBus instance for efficient resource usage
- Monitor singleton resource consumption
Error Handling¶
-
Message Delivery:
- NBus provides internal message reliability
- Implement application-level acknowledgments if needed
- Monitor message queue status and overflow conditions
- Handle message processing failures gracefully
-
Component Failures:
- Design for component failure resilience
- Implement timeout mechanisms for message processing
- Use retry logic for critical message delivery
- Monitor component health through NBus
-
Resource Management:
- Monitor NBus core resource usage
- Implement proper cleanup for message handlers
- Handle memory pressure and queue management
- Monitor system performance impact
Troubleshooting¶
Handler Registration Issues¶
-
"Cannot register NBusOutputHandler for schema" error
- Cause: Invalid URI scheme or configuration
- Solution: Verify URI uses
nbus://
scheme correctly - Check: URI format, scheme validation, path specification
-
Handler creation fails
- Cause: NBusCore initialization issues or resource constraints
- Solution: Check system resources and NBus core status
- Check: Log messages, system memory, NBus singleton status
-
Sink type not recognized
- Cause: Using unsupported sink type with NBus handler
- Solution: Use supported sink types (source_info, eventsExport, etc.)
- Check: Sink name spelling, supported sink list
Message Flow Issues¶
-
Messages not being distributed
- Cause: NBus core not processing messages or routing issues
- Solution: Check NBus core status and message routing
- Debug: Enable NBus logging, monitor message queues
- Check: Message format, routing configuration, subscriber status
-
High message latency
- Cause: Buffer overflow, processing bottlenecks, or system load
- Solution: Increase buffer sizes, optimize message processing
- Monitor: Queue sizes, processing times, system resources
- Optimize: Message filtering, buffer configuration, processing logic
-
Message loss or corruption
- Cause: Buffer overflow, memory pressure, or processing errors
- Solution: Implement proper error handling and monitoring
- Check: Memory usage, buffer sizes, error logs
- Monitor: Message delivery rates, error frequencies
VANA Integration Issues¶
-
VANA data format errors
- Cause: Incorrect data structure or format conversion issues
- Solution: Verify VANA format requirements and data structure
- Check: Data format specification, conversion logic, field mapping
-
Tracking data inconsistencies
- Cause: Track state management or data synchronization issues
- Solution: Review tracking workflow and data flow
- Monitor: Track states, data consistency, synchronization points
-
Frame processing synchronization
- Cause: Frame processing timing or coordination issues
- Solution: Check frame processing workflow and timing
- Debug: Frame processing logs, timing analysis, workflow status
Performance Issues¶
-
High memory usage
- Cause: Large message buffers or memory leaks
- Solution: Optimize buffer sizes, implement cleanup
- Monitor: Memory consumption, buffer utilization, garbage collection
- Optimize: Buffer configuration, message filtering, cleanup logic
-
CPU utilization
- Cause: Intensive message processing or routing overhead
- Solution: Optimize message processing logic and routing
- Profile: CPU usage patterns, processing bottlenecks
- Optimize: Message filtering, processing algorithms, routing efficiency
-
System responsiveness
- Cause: NBus blocking operations or resource contention
- Solution: Implement non-blocking operations and resource management
- Monitor: System responsiveness, blocking operations, resource contention
Debugging Tools¶
-
NBus Status Monitoring:
-- Check NBus core status (if NBus factory is available) -- local nbus = api.factory.nbus.create("NBus") -- Monitor message flow function monitorNBusStatus() -- Check if NBus core is active local coreStatus = NBusCore.getInstance():getStatus() print("NBus Core Status: " .. coreStatus) -- Monitor message queues local queueStatus = NBusCore.getInstance():getQueueStatus() for queue, status in pairs(queueStatus) do print("Queue " .. queue .. ": " .. status.size .. " messages") end end
-
Message Flow Analysis:
-- Enable NBus debug logging function enableNBusDebug() NBusCore.getInstance():setLogLevel("DEBUG") print("NBus debug logging enabled") end -- Monitor specific message types function monitorMessageTypes() -- If NBus factory is available: -- local nbus = api.factory.nbus.create("NBus") nbus:subscribe("analytics/events") -- Count messages by type local messageCounts = {} local message = nbus:receiveMessage("analytics/events") if message then local msgType = message.type or "unknown" messageCounts[msgType] = (messageCounts[msgType] or 0) + 1 end -- Report statistics for msgType, count in pairs(messageCounts) do print("Message type " .. msgType .. ": " .. count .. " received") end end
-
System Integration Testing: ```lua -- Test NBus integration function testNBusIntegration() local instance = api.thread.getCurrentInstance() local output = api.factory.output.create(instance, "Output")
-- Test handler creation local testHandler = output:addHandler( "test-handler", "nbus://test-hub", "eventsExport", {messageType = "test"} ) if testHandler then print("✓ NBus handler creation successful") else print("✗ NBus handler creation failed") end -- Test message flow local testMessage = { type = "test", timestamp = os.time(), data = "NBus integration test" } -- Simulate message processing print("Testing NBus message flow...")
end ```
Integration Examples¶
Multi-Instance Coordination¶
-- Configure NBus for coordinating multiple CVEDIA-RT instances
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")
-- Instance coordination configuration
local coordinationConfig = {
messageType = "coordination",
bufferSize = 1000,
instanceId = instance:getInstanceId(),
coordinationRole = "primary", -- or "secondary"
heartbeatInterval = 30
}
local coordHandler = output:addHandler(
"instance-coordination",
"nbus://multi-instance-coord",
"source_info",
coordinationConfig
)
-- Event sharing between instances
local eventSharingHandler = output:addHandler(
"event-sharing",
"nbus://shared-events",
"eventsExport",
{
messageType = "shared_events",
shareWithInstances = {"instance-2", "instance-3"},
eventTypes = {"critical", "high_priority"}
}
)
Custom Analytics Pipeline¶
-- NBus integration for custom analytics pipeline
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "Output")
-- Configure NBus for analytics stages
local pipelineStages = {
{
name = "preprocessing",
uri = "nbus://analytics-preprocessing",
sink = "source_info",
config = {messageType = "preprocessing", stage = 1}
},
{
name = "detection",
uri = "nbus://analytics-detection",
sink = "eventsExport",
config = {messageType = "detection", stage = 2}
},
{
name = "tracking",
uri = "nbus://analytics-tracking",
sink = "locked_tracks",
config = {messageType = "tracking", stage = 3}
},
{
name = "postprocessing",
uri = "nbus://analytics-postprocessing",
sink = "frame_processed",
config = {messageType = "postprocessing", stage = 4}
}
}
-- Create pipeline coordination
for _, stage in ipairs(pipelineStages) do
local handler = output:addHandler(stage.name, stage.uri, stage.sink, stage.config)
if handler then
print("Analytics pipeline stage configured: " .. stage.name)
end
end
See Also¶
- Output Plugins Overview
- Output Plugin - Main output coordination system
- TopBus Plugin - Alternative messaging system
- MQTT Plugin - External MQTT messaging
- WriteData Plugin - File-based output
- Plugin Development Guide - Creating custom plugins
- System Architecture - CVEDIA-RT architecture overview