MQTT Configuration
BlazeBee uses MQTT v3 for efficient, scalable data distribution, integrating smoothly with monitoring ecosystems for real-time visibility.
Purpose of This Page
Section titled “Purpose of This Page”Details transport settings. Use for MQTT tuning.
Features
Section titled “Features”- Automatic Reconnection - Exponential backoff with configurable limits
- Connection State Monitoring - Watch channel for real-time connection state updates
- Multiple Serialization Formats - JSON, MessagePack, CBOR support
- Compression Support - Gzip and Zstd compression for efficient bandwidth usage
- TLS/SSL - Client certificate authentication and CA verification
- Configuration Management - Load from TOML, JSON, YAML with validation
- Comprehensive Error Handling - Categorized error types for proper recovery
- Async/Await - Full tokio integration with non-blocking operations
- Subscription Persistence - Automatic resubscription on reconnection
Minimal Example
Section titled “Minimal Example”use blazebee_mqtt_v3::{MqttManager, EndpointMetadata, Publisher};use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]struct SensorReading { temperature: f32, humidity: f32,}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { // Create MQTT manager let manager = MqttManager::new("localhost", 1883)?; let instance = std::sync::Arc::new(manager.build_and_start().await?);
// Create publisher let publisher = Publisher::new(instance.clone());
// Subscribe to topics instance.subscribe("sensor/+/data").await?; instance.start_monitoring().await?;
// Publish a message let reading = SensorReading { temperature: 22.5, humidity: 55.0, };
let metadata = EndpointMetadata { qos: 1, topic: "sensor/living_room".into(), retain: false, };
publisher.publish(&reading, &metadata).await?;
// Graceful shutdown instance.shutdown().await?; Ok(())}Configuration
Section titled “Configuration”From TOML File
Section titled “From TOML File”Create config.toml:
[transport]client_id = "your-client-id"base_topic = "my-app"host = "mqtt.example.com"port = 8883clean_session = falsekeep_alive = 60max_inflight = 100
[transport.tls]ca_cert_path = "/etc/mqtt/ca.pem"client_cert_path = "/etc/mqtt/client.crt"client_key_path = "/etc/mqtt/client.key"
[transport.serialization]format = "msgpack"compression = "zstd"compression_threshold = 1024Load it:
let config_text = std::fs::read_to_string("mqtt.toml")?;let config: Config = toml::from_str(&config_text)?;let manager = MqttManager::from_config(config)?;Programmatic Configuration
Section titled “Programmatic Configuration”use blazebee_mqtt_v3::{Config, TlsConfig};
let config = Config { base_topic: "iot-app".into(), host: "broker.hivemq.com".into(), port: 8883, clean_session: false, max_inflight: 50, keep_alive: 60, client_id: "device_001".into(), max_packet_size: Some(65535), request_channel_capacity: Some(100), tls: Some(TlsConfig::with_ca_only("/etc/mqtt/ca.pem")),};
let manager = MqttManager::from_config(config)?;Usage Patterns
Section titled “Usage Patterns”Pattern 1: Monitoring Connection State
Section titled “Pattern 1: Monitoring Connection State”use mqtt_manager::ConnectionState;
let mut state_rx = instance.supervisor().state_receiver();
tokio::spawn(async move { loop { if state_rx.changed().await.is_err() { break; }
match state_rx.borrow().clone() { ConnectionState::Connected => { println!("Online"); } ConnectionState::Disconnected(reason) => { eprintln!("Offline: {}", reason); } ConnectionState::Reconnecting(secs) => { println!("Reconnecting in {:.1}s", secs); } _ => {} } }});Pattern 2: Publishing with Retry
Section titled “Pattern 2: Publishing with Retry”// Simple publish with automatic retry on failureasync fn publish_with_retry( publisher: &Publisher, data: &SensorReading, metadata: &EndpointMetadata,) -> Result<(), Box<dyn std::error::Error>> { let mut delay = std::time::Duration::from_millis(100);
for attempt in 1..=5 { match publisher.publish(data, metadata).await { Ok(()) => return Ok(()), Err(e) if attempt < 5 => { eprintln!("Attempt {} failed: {}", attempt, e); tokio::time::sleep(delay).await; delay *= 2; } Err(e) => return Err(Box::new(e)), } }}Pattern 3: Publisher Presets
Section titled “Pattern 3: Publisher Presets”Choose a preset configuration optimized for your use case:
// Development/debugging: JSON, no compressionlet publisher = mqtt_manager::publisher::presets::minimal(instance);
// Compact: MessagePack, no compressionlet publisher = mqtt_manager::publisher::presets::compact(instance);
// Balanced: JSON + Gziplet publisher = mqtt_manager::publisher::presets::efficient(instance);
// High-throughput: MessagePack + Zstdlet publisher = mqtt_manager::publisher::presets::high_performance(instance);
// Ultra-compact: CBOR + Zstdlet publisher = mqtt_manager::publisher::presets::ultra_compact(instance);Pattern 4: Custom Serialization
Section titled “Pattern 4: Custom Serialization”use mqtt_manager::config::{SerializationConfig, SerializationFormat, CompressionType};
let config = SerializationConfig { format: SerializationFormat::MessagePack, compression: CompressionType::Zstd, compression_threshold: 512,};
let publisher = Publisher::with_config(instance, &config);Architecture
Section titled “Architecture”The library follows a layered architecture:
Application Code ↓MqttInstance (high-level API) ├─ Publisher (serialization, compression, framing) ├─ Supervisor (lifecycle management) └─ ConnectionKernel (event loop, reconnection) ├─ Backoff (exponential backoff) └─ rumqttc AsyncClient + EventLoop ↓ Network (TCP/TLS)Core Components
Section titled “Core Components”- MqttManager - Entry point, coordinates initialization
- MqttInstance - Active connection, provides public API
- Publisher - Handles serialization, compression, publishing
- ConnectionKernel - Manages event loop, reconnection, state
- SubscriptionManager - Tracks and resubscribes to topics
- Supervisor - Monitors connection lifecycle, publishes status
- Backoff - Exponential backoff algorithm with configurable limits
- Framer - Handles the message into frames
Error Handling
Section titled “Error Handling”The library uses a unified TransferError type that categorizes all failures:
use mqtt_manager::TransferError;
match publisher.publish(&data, &metadata).await { Ok(()) => println!("Success"),
// Configuration errors (fix and restart) Err(TransferError::ClientSetup(msg)) => { eprintln!("Setup failed: {}", msg); std::process::exit(1); }
// Data serialization errors (fix data) Err(TransferError::Serialization(msg)) => { eprintln!("Data too large: {}", msg); }
// Network errors (will retry automatically) Err(TransferError::ClientConnection(e)) => { eprintln!("Network error: {}", e); }
// Retry policy exhausted (critical) Err(TransferError::RetriesPolicy(e)) => { eprintln!("Cannot reconnect: {}", e); std::process::exit(1); }
Err(e) => eprintln!("Other error: {}", e),}QoS Guarantees
Section titled “QoS Guarantees”This library supports all three MQTT QoS levels:
| QoS | Name | Guarantee | Use Case |
|---|---|---|---|
| 0 | At Most Once | Fire-and-forget | Sensors with frequent updates |
| 1 | At Least Once | Delivered once or more | Important notifications |
| 2 | Exactly Once | Delivered exactly once | Critical transactions |
Note: QoS is per-message, set in EndpointMetadata. For critical data, always use QoS 1+.
Connection Lifecycle
Section titled “Connection Lifecycle”The connection goes through these states:
Connecting ──(CONNACK)──> Connected ↓ (network error) ↓ Disconnected ↓ Reconnecting(N) ↓ ConnectingTLS/SSL Configuration
Section titled “TLS/SSL Configuration”CA-Only Verification (Most Common)
Section titled “CA-Only Verification (Most Common)”let tls = TlsConfig::with_ca_only("/etc/mqtt/ca.pem");Mutual TLS (Client Authentication)
Section titled “Mutual TLS (Client Authentication)”let tls = TlsConfig::new( "/etc/mqtt/ca.pem", "/etc/mqtt/client.crt", "/etc/mqtt/client.key",);Security Best Practices:
- Keep private keys in secure storage (e.g., HashiCorp Vault)
- Set file permissions:
chmod 600on key files - Never commit keys to version control
- Rotate certificates before expiry
- Use strong key sizes (2048+ bits for RSA)
Serialization Formats
Section titled “Serialization Formats”Choose based on your requirements:
| Format | Size | Speed | Human-Readable | Use Case |
|---|---|---|---|---|
| JSON | Large (1.0x) | Slow | Yes | Debugging, development |
| MessagePack | Small (0.6x) | Fast | No | Production, bandwidth-limited |
| CBOR | Small (0.6x) | Medium | No | Interoperability |
Compression reduces size by 60-80% for text/structured data:
- Gzip: Standard, good compatibility
- Zstd: Faster, better ratio (recommended)
Performance
Section titled “Performance”Typical performance on modern hardware (Intel i7, 8GB RAM):
| Metric | Value |
|---|---|
| Throughput | 1,000-2,000 msg/sec |
| Latency (network) | 10-100 ms |
| Memory (idle) | 5-10 MB |
| Memory (per subscription) | ~100 bytes |
| CPU (idle) | < 1% |
| CPU (loaded) | < 5% |
Optimization Tips:
- Use MessagePack + Zstd for maximum throughput
- Increase
max_inflightfor bursty traffic - Increase
request_channel_capacityto buffer more locally - Use QoS 0 only for high-frequency, low-importance data
Thread Safety
Section titled “Thread Safety”All public types are safe for concurrent use:
let instance = Arc::new(instance);let publisher = Publisher::new(instance.clone());
// Spawn multiple tasks publishing concurrentlyfor _ in 0..10 { let pub = publisher.clone(); tokio::spawn(async move { // Safe to publish from multiple tasks pub.publish(&data, &metadata).await?; });}// or Spawn multiple tasks publishing concurrently with frame streamfor _ in 0..10 { let pub = publisher.clone(); tokio::spawn(async move { // Safe to publish from multiple tasks pub.publish_framed(&data, &metadata).await?; });}Examples
Section titled “Examples”The repository includes several complete examples:
./crates/mqtt/v3/01_basic_publish_subscribe.rs- Simple pub/sub with state monitoring./crates/mqtt/v3/02_serialization_formats.rs- Different serialization options./crates/mqtt/v3/03_basic_publish_framed_msg.rs- Different serialization options with frame stream
Run examples:
cargo run --example ./crates/mqtt/v3/01_basic_publish_subscribecargo run --example ./crates/mqtt/v3/02_serialization_formatscargo run --example ./crates/mqtt/v3/03_basic_publish_framed_msgTesting
Section titled “Testing”Run the test suite:
# Unit testscargo test --lib
# All testscargo test --all
# With outputcargo test -- --nocaptureTroubleshooting
Section titled “Troubleshooting”Connection Refused
Section titled “Connection Refused”Error: Connection refused (os error 111)Solution: Verify broker is running and host/port are correct.
# Test connectivitytelnet localhost 1883TLS Certificate Validation Failed
Section titled “TLS Certificate Validation Failed”Error: TLS handshake failedSolution: Verify CA certificate path and format (must be PEM).
# Inspect certificateopenssl x509 -in ca.pem -text -noout
# Verify formatfile ca.pem # Should be: PEM certificateMessage Too Large
Section titled “Message Too Large”Error: Serialization error: message too largeSolution: Reduce data size or increase max_packet_size in config.
config.max_packet_size = Some(262144); // 256KBHigh CPU Usage
Section titled “High CPU Usage”Solution: Enable compression, reduce publish rate, or profile with perf.
let config = SerializationConfig { format: SerializationFormat::MessagePack, compression: CompressionType::Zstd, compression_threshold: 512,};Memory Growth
Section titled “Memory Growth”Solution: Check for message buffering or subscription memory leaks.
# Monitor memorywatch -n 1 'ps aux | grep mqtt'
# Profile with valgrindvalgrind --leak-check=full ./target/debug/your_appAcknowledgments
Section titled “Acknowledgments”Built on top of the excellent rumqttc library, which provides the underlying MQTT protocol implementation.
Support
Section titled “Support”- Examples: See
/examplesdirectory
Changelog
Section titled “Changelog”v0.1.0 (Initial Release)
Section titled “v0.1.0 (Initial Release)”- Core MQTT functionality with automatic reconnection
- Multiple serialization formats (JSON, MessagePack, CBOR)
- Compression support (Gzip, Zstd)
- TLS/SSL with mutual authentication
- Comprehensive error handling
- Connection state monitoring
- Configuration management with TOML support
Roadmap
Section titled “Roadmap”Planned for future releases:
- Message buffering with persistence
- Metrics collection (Prometheus integration)
- Circuit breaker pattern
- Multiple broker failover
- Advanced diagnostics and debugging tools
- Performance optimizations