TopBus Plugin¶
Description¶
The TopBus plugin provides high-performance message bus communication capabilities through the Output plugin system. It registers the topbus://
URI scheme and serves as an advanced output handler for inter-process communication, event distribution, and data exchange within CVEDIA-RT components. TopBus offers enhanced performance features including shared memory communication and specialized VANA (Video Analytics) integration with additional PTZ (Pan-Tilt-Zoom) camera control capabilities.
Key Features¶
- High-Performance Message Bus: Advanced messaging system with shared memory support
- Output Handler Integration: Registered
topbus://
URI scheme with Output plugin - Multi-Sink Support: Handles various data types (source_info, eventsExport, locked_tracks, PTZ control)
- VANA Data Structure Integration: Enhanced integration with Video Analytics data formats
- PTZ Camera Control: Specialized handling of PTZ camera control and idle detection
- Shared Memory Communication: High-throughput data exchange using shared memory
- Event Distribution: Efficient event routing between system components
- Track Management: Advanced tracking data management and movement events
- Frame Processing Coordination: Synchronized frame processing workflows
- Singleton Architecture: Centralized TopBusCore instance for system-wide coordination
When to Use¶
- High-performance inter-process communication within CVEDIA-RT
- Advanced event distribution with shared memory optimization
- Integration with VANA (Video Analytics) systems requiring high throughput
- PTZ camera control and coordination workflows
- Performance-critical messaging for real-time applications
- Large-scale data exchange between distributed components
- Specialized messaging for enterprise CVEDIA-RT deployments
Requirements¶
Software Dependencies¶
- CVEDIA-RT core system
- TopBus core libraries and messaging infrastructure
- Shared memory support (OS-level)
- VANA data structure support (if using analytics features)
- PTZ camera control libraries (if using PTZ features)
System Requirements¶
- Sufficient shared memory for high-throughput messaging
- CPU resources for message routing and processing
- Memory for message buffering and shared memory segments
- Inter-process communication capabilities
- PTZ camera hardware support (if using PTZ features)
Configuration¶
The TopBus plugin is configured through the Output plugin system using the topbus://
URI scheme.
Basic Configuration¶
{
"output": {
"handlers": {
"topbus-events": {
"uri": "topbus://analytics-hub",
"sink": "eventsExport",
"enabled": true,
"config": {
"messageType": "analytics",
"useSharedMemory": true,
"bufferSize": 2000
}
}
}
}
}
Advanced Multi-Sink Configuration¶
{
"output": {
"handlers": {
"topbus-source-info": {
"uri": "topbus://source-coordinator",
"sink": "source_info",
"enabled": true,
"config": {
"messageType": "source_management",
"useSharedMemory": true
}
},
"topbus-events": {
"uri": "topbus://events-hub",
"sink": "eventsExport",
"enabled": true,
"config": {
"messageType": "analytics",
"useSharedMemory": true,
"enableVanaFormat": true,
"bufferSize": 3000
}
},
"topbus-tracking": {
"uri": "topbus://tracking-coordinator",
"sink": "locked_tracks",
"enabled": true,
"config": {
"messageType": "tracking",
"useSharedMemory": true,
"trackingOptimized": true
}
},
"topbus-ptz": {
"uri": "topbus://ptz-controller",
"sink": "ptz_idle_request",
"enabled": true,
"config": {
"messageType": "ptz_control",
"ptzEnabled": true,
"idleDetection": true
}
}
}
}
}
High-Performance Configuration¶
{
"output": {
"handlers": {
"topbus-high-perf": {
"uri": "topbus://high-performance-hub",
"sink": "eventsExport",
"enabled": true,
"config": {
"messageType": "high_performance",
"useSharedMemory": true,
"bufferSize": 5000,
"sharedMemorySize": "64MB",
"enableVanaFormat": true,
"trackingEnabled": true,
"ptzEnabled": true,
"performanceMode": "maximum",
"messageCompression": true,
"priorityQueue": true
}
}
}
}
}
Configuration Schema¶
Parameter | Type | Default | Description |
---|---|---|---|
uri |
string | required | TopBus URI (topbus://path) |
sink |
string | required | Specific sink type for TopBus processing |
enabled |
boolean | true | Enable/disable TopBus output |
config.messageType |
string | "generic" | Type of messages to handle |
config.useSharedMemory |
boolean | true | Enable shared memory communication |
config.bufferSize |
integer | 2000 | Message buffer size |
config.sharedMemorySize |
string | "32MB" | Shared memory segment size |
config.enableVanaFormat |
boolean | false | Enable VANA data structure format |
config.trackingEnabled |
boolean | true | Enable tracking data processing |
config.ptzEnabled |
boolean | false | Enable PTZ camera control features |
config.idleDetection |
boolean | false | Enable PTZ idle detection |
config.performanceMode |
string | "balanced" | Performance mode (balanced, maximum, efficient) |
config.messageCompression |
boolean | false | Enable message compression |
config.priorityQueue |
boolean | false | Enable priority-based message queuing |
URI Scheme¶
The TopBus plugin registers the topbus://
URI scheme with the Output plugin system.
URI Format¶
topbus://[path][?options]
Components: - path: Optional path identifier for message routing and hub identification - options: URL-encoded query parameters for configuration
URI Examples¶
topbus://analytics-hub # Analytics data hub
topbus://high-performance-comm # High-performance communication
topbus://tracking-coordinator # Tracking coordination hub
topbus://vana-integration # VANA system integration
topbus://ptz-controller # PTZ camera control hub
Supported Sink Types¶
The TopBus plugin handles specific sink types with specialized high-performance processing:
Source Information (source_info
)¶
- Purpose: Initialize or update triggers based on source data
- Processing: High-performance source configuration and trigger setup
- Data Format: Source metadata and configuration information
- Optimization: Shared memory for rapid configuration updates
Events Export (eventsExport
)¶
- Purpose: Process analytics events for high-throughput distribution
- Processing: Prepares VANA data structure with shared memory optimization
- Data Format: Structured analytics events and detections
- Optimization: Bulk processing and shared memory transfers
Locked Tracks (locked_tracks
)¶
- Purpose: Manage persistent tracking data with high performance
- Processing: Adds locked tracks to VANA data structure using shared memory
- Data Format: Track objects with locked/confirmed status
- Optimization: Efficient track state management
Movement Tracks (object_left_removed_motion_tracks
)¶
- Purpose: Handle object movement and state changes with low latency
- Processing: Tracks from object left/removed triggers with shared memory
- Data Format: Movement tracking data and object state changes
- Optimization: Real-time movement tracking updates
Frame Processed (frame_processed
)¶
- Purpose: Coordinate frame processing completion with synchronization
- Processing: Processes VANA data and clears temporary data efficiently
- Data Format: Frame processing status and cleanup signals
- Optimization: Synchronized frame processing coordination
PTZ Idle Request (ptz_idle_request
)¶
- Purpose: PTZ camera control and idle detection
- Processing: Checks PTZ movement status and sets idle flags
- Data Format: PTZ status, movement detection, idle state
- Optimization: Real-time PTZ control coordination
API Reference¶
C++ API¶
The TopBus plugin implements the iface::OutputHandler
interface with high-performance messaging capabilities:
class TopBusOutputHandler : public iface::OutputHandler {
public:
// Constructor
TopBusOutputHandler(const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
const pCValue& config);
// Factory method (registered with Output plugin)
static std::shared_ptr<iface::OutputHandler> create(
const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
pCValue config);
// High-performance message operations
expected<bool> write(pCValue sinkData = VAL(),
std::string dynamicPath = "") override;
void stop() override;
void close() override;
// Handler information
std::string getSink() override;
};
TopBus Core Classes¶
// Singleton high-performance message bus core
class TopBusCore {
public:
// Singleton access
static TopBusCore* getInstance();
// High-performance sink handlers
expected<bool> handleSourceSink(pCValue sinkData);
expected<bool> handleEventsExportSink(pCValue sinkData);
expected<bool> handleLockedTracksSink(pCValue sinkData);
expected<bool> handleMovementTracksSink(pCValue sinkData);
expected<bool> handleFrameProcessedSink(pCValue sinkData);
expected<bool> handlePTZIdleSink(pCValue sinkData);
// Shared memory operations
bool initializeSharedMemory(size_t size);
bool writeToSharedMemory(const std::string& topic, pCValue data);
pCValue readFromSharedMemory(const std::string& topic);
void cleanupSharedMemory();
// High-performance messaging
bool publishMessage(const std::string& topic, pCValue data, bool useSharedMemory = true);
bool subscribeToTopic(const std::string& topic, MessageHandler handler);
void unsubscribeFromTopic(const std::string& topic);
// VANA integration with performance optimization
bool prepareVanaData(pCValue eventData);
void clearTemporaryData();
bool optimizeVanaProcessing();
// PTZ control integration
bool updatePTZStatus(pCValue ptzData);
bool checkPTZIdle();
void setPTZIdleFlag(bool idle);
};
// Managed TopBus interface with high-performance features
class TopBusManaged : public iface::TopBus {
public:
// Factory method
static std::unique_ptr<iface::TopBus> create(const std::string& moduleName);
// High-performance message operations
bool sendMessage(const std::string& destination, pCValue message, bool useSharedMemory = true);
pCValue receiveMessage(const std::string& source);
bool broadcast(pCValue message, bool useSharedMemory = true);
// Subscription management
bool subscribe(const std::string& topic);
bool unsubscribe(const std::string& topic);
// Performance monitoring
size_t getMessageQueueSize();
double getMessageThroughput();
size_t getSharedMemoryUsage();
};
// Shared memory utilities
class TV_SHM {
public:
// Shared memory segment management
bool createSegment(const std::string& name, size_t size);
bool attachSegment(const std::string& name);
void* getSegmentPointer();
bool detachSegment();
void destroySegment();
// Data operations
bool writeData(const void* data, size_t size, size_t offset = 0);
bool readData(void* buffer, size_t size, size_t offset = 0);
size_t getSegmentSize();
};
Lua API¶
The TopBus plugin is accessed through the Output plugin system and standalone factory:
-- Via Output plugin system (recommended for high-performance data distribution)
local output = api.factory.output.create(instance, "TopBusCoordinator")
-- Add TopBus output handler for high-performance analytics
local topbusHandler = output:addHandler(
"topbus-analytics",
"topbus://analytics-hub",
"eventsExport",
{
messageType = "analytics",
useSharedMemory = true,
bufferSize = 3000,
enableVanaFormat = true
}
)
-- Add TopBus handler for PTZ control
local ptzHandler = output:addHandler(
"topbus-ptz",
"topbus://ptz-controller",
"ptz_idle_request",
{
messageType = "ptz_control",
ptzEnabled = true,
idleDetection = true
}
)
-- Standalone TopBus client (for direct high-performance messaging)
local topbus = api.factory.topbus.create("TopBusClient")
-- High-performance message operations
topbus:sendMessage("destination", messageData, true) -- true = use shared memory
local receivedMessage = topbus:receiveMessage("source")
topbus:broadcast(broadcastData, true) -- true = use shared memory
-- Subscription management
topbus:subscribe("analytics/events")
topbus:unsubscribe("analytics/events")
-- Performance monitoring
local queueSize = topbus:getMessageQueueSize()
local throughput = topbus:getMessageThroughput()
local memoryUsage = topbus:getSharedMemoryUsage()
Note: TopBus operates as both a high-performance output handler integrated with the Output plugin system and a standalone messaging interface for direct inter-process communication with shared memory optimization.
Examples¶
High-Performance Analytics Distribution¶
-- Create output instance for TopBus high-performance event distribution
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "TopBusHighPerfDistributor")
-- Configure TopBus for high-throughput analytics
local highPerfConfig = {
messageType = "high_performance_analytics",
useSharedMemory = true,
bufferSize = 5000,
sharedMemorySize = "64MB",
enableVanaFormat = true,
performanceMode = "maximum",
messageCompression = true
}
local highPerfHandler = output:addHandler(
"topbus-high-perf",
"topbus://high-performance-analytics",
"eventsExport",
highPerfConfig
)
if highPerfHandler then
api.logging.LogInfo("TopBus high-performance analytics configured")
api.logging.LogInfo("Hub: high-performance-analytics")
api.logging.LogInfo("Shared Memory: 64MB")
api.logging.LogInfo("Performance Mode: Maximum")
api.logging.LogInfo("Message Compression: Enabled")
-- Analytics events will be distributed through shared memory
-- for maximum throughput and minimal latency
else
api.logging.LogError("Failed to create TopBus high-performance handler")
end
Multi-Component High-Performance Coordination¶
local output = api.factory.output.create(instance, "TopBusAdvancedCoordinator")
-- Configure multiple TopBus handlers for different high-performance workflows
local topbusHandlers = {
{
name = "source-coordinator",
uri = "topbus://source-management",
sink = "source_info",
config = {
messageType = "source_coordination",
useSharedMemory = true,
bufferSize = 1000
}
},
{
name = "analytics-distributor",
uri = "topbus://analytics-hub",
sink = "eventsExport",
config = {
messageType = "analytics",
useSharedMemory = true,
bufferSize = 4000,
enableVanaFormat = true,
performanceMode = "maximum"
}
},
{
name = "tracking-manager",
uri = "topbus://tracking-coordinator",
sink = "locked_tracks",
config = {
messageType = "tracking",
useSharedMemory = true,
trackingOptimized = true,
bufferSize = 2000
}
},
{
name = "movement-tracker",
uri = "topbus://movement-coordinator",
sink = "object_left_removed_motion_tracks",
config = {
messageType = "movement_tracking",
useSharedMemory = true,
realTimeOptimized = true
}
},
{
name = "frame-processor",
uri = "topbus://frame-sync",
sink = "frame_processed",
config = {
messageType = "frame_coordination",
useSharedMemory = true,
syncOptimized = true
}
},
{
name = "ptz-controller",
uri = "topbus://ptz-control",
sink = "ptz_idle_request",
config = {
messageType = "ptz_control",
ptzEnabled = true,
idleDetection = true,
realTimeControl = true
}
}
}
-- Create all TopBus high-performance coordination handlers
local activeHandlers = {}
for _, handler in ipairs(topbusHandlers) do
local topbusHandler = output:addHandler(
handler.name,
handler.uri,
handler.sink,
handler.config
)
if topbusHandler then
table.insert(activeHandlers, handler.name)
api.logging.LogInfo("TopBus handler created: " .. handler.name)
api.logging.LogInfo(" URI: " .. handler.uri)
api.logging.LogInfo(" Sink: " .. handler.sink)
api.logging.LogInfo(" Shared Memory: " .. (handler.config.useSharedMemory and "Enabled" or "Disabled"))
api.logging.LogInfo(" Performance: " .. (handler.config.performanceMode or "Standard"))
else
api.logging.LogError("Failed to create TopBus handler: " .. handler.name)
end
end
api.logging.LogInfo("Active TopBus high-performance handlers: " .. #activeHandlers)
PTZ Camera Integration¶
local output = api.factory.output.create(instance, "TopBusPTZIntegration")
-- Configure TopBus for PTZ camera control and coordination
local ptzConfig = {
messageType = "ptz_coordination",
ptzEnabled = true,
idleDetection = true,
realTimeControl = true,
useSharedMemory = true,
bufferSize = 500
}
local ptzHandler = output:addHandler(
"ptz-coordinator",
"topbus://ptz-control-hub",
"ptz_idle_request",
ptzConfig
)
-- Also configure analytics coordination with PTZ
local analyticsWithPTZHandler = output:addHandler(
"analytics-ptz-coord",
"topbus://analytics-ptz-coordination",
"eventsExport",
{
messageType = "analytics_with_ptz",
useSharedMemory = true,
enableVanaFormat = true,
ptzIntegration = true,
bufferSize = 2000
}
)
if ptzHandler and analyticsWithPTZHandler then
api.logging.LogInfo("TopBus PTZ integration configured")
api.logging.LogInfo("PTZ Control Hub: ptz-control-hub")
api.logging.LogInfo("Analytics-PTZ Coordination: analytics-ptz-coordination")
api.logging.LogInfo("Idle Detection: Enabled")
api.logging.LogInfo("Real-time Control: Enabled")
-- Function to monitor PTZ status
function monitorPTZStatus()
local topbus = api.factory.topbus.create("PTZMonitor")
topbus:subscribe("ptz/status")
local ptzStatus = topbus:receiveMessage("ptz/status")
if ptzStatus then
api.logging.LogInfo("PTZ Status Update:")
api.logging.LogInfo(" Position: " .. (ptzStatus.position or "unknown"))
api.logging.LogInfo(" Moving: " .. (ptzStatus.moving and "Yes" or "No"))
api.logging.LogInfo(" Idle: " .. (ptzStatus.idle and "Yes" or "No"))
end
end
else
api.logging.LogError("Failed to create TopBus PTZ integration")
end
VANA Integration with Shared Memory¶
local output = api.factory.output.create(instance, "TopBusVANAIntegration")
-- Configure TopBus for high-performance VANA system integration
local vanaConfig = {
messageType = "vana_analytics",
useSharedMemory = true,
bufferSize = 8000,
sharedMemorySize = "128MB",
enableVanaFormat = true,
trackingEnabled = true,
performanceMode = "maximum",
messageCompression = true,
priorityQueue = true,
eventFiltering = {
enabled = true,
allowedTypes = {"intrusion", "line_crossing", "object_detection", "abandoned_object", "crowd_detection"}
}
}
local vanaHandler = output:addHandler(
"vana-high-perf",
"topbus://vana-high-performance",
"eventsExport",
vanaConfig
)
-- Configure additional VANA workflow handlers
local vanaWorkflowHandlers = {
{
name = "vana-tracking",
uri = "topbus://vana-tracking-hub",
sink = "locked_tracks",
config = {
messageType = "vana_tracking",
useSharedMemory = true,
enableVanaFormat = true,
trackingOptimized = true
}
},
{
name = "vana-movement",
uri = "topbus://vana-movement-hub",
sink = "object_left_removed_motion_tracks",
config = {
messageType = "vana_movement",
useSharedMemory = true,
enableVanaFormat = true,
realTimeOptimized = true
}
},
{
name = "vana-frame-sync",
uri = "topbus://vana-frame-sync",
sink = "frame_processed",
config = {
messageType = "vana_frame_sync",
useSharedMemory = true,
enableVanaFormat = true,
syncOptimized = true
}
}
}
-- Create VANA workflow handlers
local vanaHandlers = {}
for _, config in ipairs(vanaWorkflowHandlers) do
local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
if handler then
table.insert(vanaHandlers, config.name)
api.logging.LogInfo("VANA handler created: " .. config.name)
end
end
if vanaHandler and #vanaHandlers == #vanaWorkflowHandlers then
api.logging.LogInfo("Complete VANA high-performance workflow established")
api.logging.LogInfo("Shared Memory: 128MB")
api.logging.LogInfo("Performance Mode: Maximum")
api.logging.LogInfo("Message Compression: Enabled")
api.logging.LogInfo("Priority Queue: Enabled")
api.logging.LogInfo("Event Filtering: Advanced")
end
Enterprise-Scale Deployment¶
local output = api.factory.output.create(instance, "TopBusEnterpriseDeployment")
-- Configure TopBus for enterprise-scale deployment
local enterpriseConfigs = {
{
name = "enterprise-analytics",
uri = "topbus://enterprise-analytics-hub",
sink = "eventsExport",
config = {
messageType = "enterprise_analytics",
useSharedMemory = true,
bufferSize = 10000,
sharedMemorySize = "256MB",
enableVanaFormat = true,
performanceMode = "maximum",
messageCompression = true,
priorityQueue = true,
enterpriseFeatures = true
}
},
{
name = "enterprise-tracking",
uri = "topbus://enterprise-tracking-hub",
sink = "locked_tracks",
config = {
messageType = "enterprise_tracking",
useSharedMemory = true,
bufferSize = 5000,
trackingOptimized = true,
enterpriseScale = true
}
},
{
name = "enterprise-ptz",
uri = "topbus://enterprise-ptz-hub",
sink = "ptz_idle_request",
config = {
messageType = "enterprise_ptz",
ptzEnabled = true,
idleDetection = true,
realTimeControl = true,
enterpriseCoordination = true
}
}
}
-- Create enterprise-scale handlers
local enterpriseHandlers = {}
for _, config in ipairs(enterpriseConfigs) do
local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
if handler then
table.insert(enterpriseHandlers, config.name)
api.logging.LogInfo("Enterprise handler created: " .. config.name)
api.logging.LogInfo(" Shared Memory: " .. (config.config.sharedMemorySize or "Standard"))
api.logging.LogInfo(" Buffer Size: " .. config.config.bufferSize)
api.logging.LogInfo(" Performance Mode: " .. (config.config.performanceMode or "Standard"))
end
end
-- Monitor enterprise deployment performance
function monitorEnterprisePerformance()
local topbus = api.factory.topbus.create("EnterpriseMonitor")
api.logging.LogInfo("Enterprise TopBus Performance Status:")
api.logging.LogInfo(" Active Handlers: " .. #enterpriseHandlers)
api.logging.LogInfo(" Queue Size: " .. topbus:getMessageQueueSize())
api.logging.LogInfo(" Throughput: " .. string.format("%.2f msg/sec", topbus:getMessageThroughput()))
api.logging.LogInfo(" Shared Memory Usage: " .. string.format("%.2f MB", topbus:getSharedMemoryUsage() / (1024*1024)))
end
api.logging.LogInfo("Enterprise TopBus deployment configured: " .. #enterpriseHandlers .. " handlers")
Standalone High-Performance Messaging¶
-- For direct TopBus high-performance messaging (not through Output plugin)
local topbus = api.factory.topbus.create("DirectTopBusClient")
-- Subscribe to high-performance analytics events
topbus:subscribe("analytics/high-perf-events")
topbus:subscribe("tracking/real-time-updates")
topbus:subscribe("ptz/control-commands")
topbus:subscribe("system/performance-metrics")
-- High-performance message processing function
function processTopBusMessages()
-- Check for high-performance analytics events
local analyticsMessage = topbus:receiveMessage("analytics/high-perf-events")
if analyticsMessage then
api.logging.LogInfo("High-performance analytics event:")
api.logging.LogInfo(" Type: " .. analyticsMessage.type)
api.logging.LogInfo(" Confidence: " .. analyticsMessage.confidence)
api.logging.LogInfo(" Processing Time: " .. (analyticsMessage.processing_time or "N/A") .. "ms")
end
-- Check for real-time tracking updates
local trackingMessage = topbus:receiveMessage("tracking/real-time-updates")
if trackingMessage then
api.logging.LogInfo("Real-time tracking update:")
api.logging.LogInfo(" Track ID: " .. trackingMessage.track_id)
api.logging.LogInfo(" Status: " .. trackingMessage.status)
api.logging.LogInfo(" Position: (" .. trackingMessage.x .. ", " .. trackingMessage.y .. ")")
end
-- Check for PTZ control commands
local ptzMessage = topbus:receiveMessage("ptz/control-commands")
if ptzMessage then
api.logging.LogInfo("PTZ control command:")
api.logging.LogInfo(" Command: " .. ptzMessage.command)
api.logging.LogInfo(" Camera ID: " .. ptzMessage.camera_id)
api.logging.LogInfo(" Parameters: " .. (ptzMessage.parameters or "None"))
end
-- Check for system performance metrics
local perfMessage = topbus:receiveMessage("system/performance-metrics")
if perfMessage then
api.logging.LogInfo("Performance metrics:")
api.logging.LogInfo(" Throughput: " .. perfMessage.throughput .. " msg/sec")
api.logging.LogInfo(" Latency: " .. perfMessage.latency .. "ms")
api.logging.LogInfo(" Memory Usage: " .. perfMessage.memory_usage .. "MB")
end
end
-- Send high-performance status updates
function sendHighPerfStatus(status)
local statusMessage = {
timestamp = os.time(),
component = "analytics-engine",
instance_id = instance:getName(),
status = status,
performance_metrics = {
-- fps = status.fps, -- Use from provided status
-- memory_usage = status.memory_usage, -- Use from provided status
-- cpu_usage = status.cpu_usage, -- Use from provided status
shared_memory_usage = topbus:getSharedMemoryUsage(),
message_throughput = topbus:getMessageThroughput()
}
}
-- Use shared memory for high-performance transmission
topbus:sendMessage("system/performance-status", api.cvalue.create(statusMessage), true)
end
-- Broadcast high-priority alerts with shared memory
function broadcastHighPriorityAlert(alertType, message)
local alert = {
timestamp = os.time(),
type = alertType,
source = instance:getName(),
message = message,
priority = "critical",
requires_immediate_action = true
}
-- Use shared memory for fastest delivery
topbus:broadcast(api.cvalue.create(alert), true)
end
-- Main high-performance processing loop
function onTick()
processTopBusMessages()
-- Send performance status updates every 30 seconds
if os.time() % 30 == 0 then
sendHighPerfStatus("running")
end
-- Monitor performance and alert if degraded
local throughput = topbus:getMessageThroughput()
if throughput < 1000 then -- Below expected threshold
broadcastHighPriorityAlert("performance_degradation", "TopBus throughput below threshold: " .. throughput .. " msg/sec")
end
end
Best Practices¶
High-Performance Architecture¶
-
Shared Memory Utilization:
- Enable shared memory for high-throughput scenarios
- Configure appropriate shared memory segment sizes
- Monitor shared memory usage and fragmentation
- Implement proper shared memory cleanup
-
Performance Mode Selection:
- Use "maximum" performance mode for critical applications
- Use "balanced" for general-purpose deployments
- Use "efficient" for resource-constrained environments
- Monitor performance impact of different modes
-
Message Optimization:
- Enable message compression for large data volumes
- Use priority queues for critical message types
- Implement message filtering to reduce unnecessary traffic
- Monitor message queue sizes and processing rates
Sink Selection and Optimization¶
-
Event Distribution:
- Use
eventsExport
for high-throughput analytics events - Enable VANA format for advanced analytics integration
- Configure appropriate buffer sizes for event volume
- Monitor event processing latency
- Use
-
Tracking Management:
- Use
locked_tracks
for persistent tracking data - Use
object_left_removed_motion_tracks
for movement events - Enable tracking optimization for better performance
- Implement track state management
- Use
-
PTZ Integration:
- Use
ptz_idle_request
for PTZ camera control - Enable idle detection for AI coordination
- Implement real-time PTZ control
- Monitor PTZ status and responsiveness
- Use
Resource Management¶
-
Memory Management:
- Configure shared memory sizes based on data volume
- Monitor memory usage and implement cleanup
- Use appropriate buffer sizes for different workloads
- Implement memory pressure handling
-
CPU Optimization:
- Monitor message processing CPU usage
- Use multi-threading for parallel processing
- Implement load balancing for multiple handlers
- Optimize message routing algorithms
-
System Integration:
- Coordinate with system resource management
- Implement graceful degradation under load
- Monitor system performance impact
- Use resource limits and quotas
Troubleshooting¶
High-Performance Issues¶
-
Low message throughput
- Cause: Suboptimal configuration or system constraints
- Solution: Enable shared memory, increase buffer sizes, optimize performance mode
- Monitor: Message throughput rates, queue sizes, processing latency
- Optimize: Performance mode settings, shared memory configuration
-
High memory usage
- Cause: Large shared memory segments or buffer overflow
- Solution: Optimize shared memory sizes, implement cleanup
- Monitor: Shared memory usage, buffer utilization, memory fragmentation
- Check: Memory leaks, cleanup procedures, segment management
-
Message processing latency
- Cause: Processing bottlenecks or system load
- Solution: Enable priority queues, optimize message processing
- Monitor: Processing times, queue depths, system load
- Optimize: Priority settings, processing algorithms, resource allocation
Shared Memory Issues¶
-
Shared memory creation fails
- Cause: Insufficient system resources or permissions
- Solution: Check system limits, adjust permissions
- Check: System shared memory limits, user permissions, available memory
- Debug: System logs, shared memory status, resource usage
-
Shared memory corruption
- Cause: Concurrent access issues or improper cleanup
- Solution: Implement proper synchronization, cleanup procedures
- Monitor: Data integrity, access patterns, corruption events
- Debug: Memory access logs, synchronization issues
PTZ Integration Issues¶
-
PTZ control not responding
- Cause: Communication issues or configuration problems
- Solution: Check PTZ connectivity, verify configuration
- Monitor: PTZ command response times, connection status
- Debug: PTZ communication logs, command execution status
-
Idle detection not working
- Cause: PTZ status updates not received or processing issues
- Solution: Verify PTZ status reporting, check processing logic
- Monitor: PTZ status updates, idle detection events
- Debug: Status message flow, idle detection logic
VANA Integration Issues¶
-
VANA data format errors
- Cause: Data structure incompatibilities or conversion issues
- Solution: Verify VANA format requirements, update conversion logic
- Check: Data format specifications, conversion algorithms
- Debug: VANA data validation, format conversion logs
-
Performance degradation with VANA
- Cause: Complex data processing or large data volumes
- Solution: Optimize VANA processing, enable shared memory
- Monitor: VANA processing times, data throughput
- Optimize: Processing algorithms, memory usage, data flow
Integration Examples¶
Enterprise Surveillance System¶
{
"output": {
"handlers": {
"enterprise-analytics": {
"uri": "topbus://enterprise-surveillance",
"sink": "eventsExport",
"enabled": true,
"config": {
"messageType": "enterprise_surveillance",
"useSharedMemory": true,
"bufferSize": 15000,
"sharedMemorySize": "512MB",
"enableVanaFormat": true,
"performanceMode": "maximum",
"messageCompression": true,
"priorityQueue": true,
"eventFiltering": {
"enabled": true,
"allowedTypes": ["intrusion", "unauthorized_access", "crowd_detection", "abandoned_object"],
"minConfidence": 0.85
}
}
},
"ptz-coordination": {
"uri": "topbus://enterprise-ptz",
"sink": "ptz_idle_request",
"enabled": true,
"config": {
"messageType": "ptz_enterprise",
"ptzEnabled": true,
"idleDetection": true,
"realTimeControl": true,
"enterpriseCoordination": true,
"autoTracking": true
}
}
}
}
}
Real-Time Manufacturing Monitoring¶
{
"output": {
"handlers": {
"manufacturing-monitor": {
"uri": "topbus://manufacturing-realtime",
"sink": "eventsExport",
"enabled": true,
"config": {
"messageType": "manufacturing_monitoring",
"useSharedMemory": true,
"bufferSize": 8000,
"sharedMemorySize": "256MB",
"performanceMode": "maximum",
"realTimeOptimized": true,
"eventFiltering": {
"allowedTypes": ["quality_issue", "safety_violation", "production_anomaly"],
"realTimeOnly": true
}
}
}
}
}
}
See Also¶
- Output Plugins Overview
- Output Plugin - Main output coordination system
- NBus Plugin - Standard messaging system
- MQTT Plugin - External MQTT messaging
- WriteData Plugin - File-based output
- GStreamerWriter Plugin - Video streaming
- System Architecture - CVEDIA-RT architecture overview
- Performance Tuning Guide - System optimization