NATS Protocol
FlowMQ plans to provide native NATS protocol support in future releases, allowing you to use standard NATS clients and tools to connect, publish, and consume messages. NATS in FlowMQ will map to the unified routing model for high-performance, lightweight messaging with excellent interoperability.
Overview
NATS in FlowMQ will map to the unified routing model where:
- NATS Subjects map to Topics
- NATS Subscriptions map to Subscriptions
- NATS Queues provide load balancing
Connection
Connect to FlowMQ using standard NATS clients:
import nats
# Connect to FlowMQ
nc = await nats.connect("nats://your-namespace.flowmq.io:4222")
# With authentication
nc = await nats.connect(
"nats://your-namespace.flowmq.io:4222",
user="username",
password="password"
)Publishing Messages
Basic Publishing
import nats
import json
# Connect
nc = await nats.connect("nats://your-namespace.flowmq.io:4222")
# Publish simple message
await nc.publish("sensors.temperature", b"22.5")
# Publish JSON payload
payload = {
"temperature": 22.5,
"humidity": 65,
"timestamp": "2024-01-15T10:30:00Z"
}
await nc.publish("sensors.room1", json.dumps(payload).encode())Request-Reply Pattern
# Request with timeout
response = await nc.request("orders.process", b'{"order_id": "12345"}', timeout=5.0)
print(f"Response: {response.data.decode()}")
# Handle requests
async def order_handler(msg):
order_data = json.loads(msg.data.decode())
# Process order
result = {"status": "processed", "order_id": order_data["order_id"]}
await msg.respond(json.dumps(result).encode())
# Subscribe to requests
await nc.subscribe("orders.process", cb=order_handler)Subscribing to Messages
Basic Subscription
async def message_handler(msg):
print(f"Received: {msg.subject} -> {msg.data.decode()}")
# Process the message
# Subscribe to specific subject
await nc.subscribe("sensors.temperature", cb=message_handler)
# Subscribe to multiple subjects
await nc.subscribe("sensors.*", cb=message_handler)Wildcard Subscriptions
# Single-level wildcard
await nc.subscribe("sensors.+.temperature", cb=message_handler)
# Matches: sensors.room1.temperature, sensors.room2.temperature
# Multi-level wildcard
await nc.subscribe("sensors.>", cb=message_handler)
# Matches: sensors.temperature, sensors.room1.temperature, sensors.room1.humidity
# Multiple wildcards
await nc.subscribe("devices.+.+.status", cb=message_handler)
# Matches: devices.sensor.001.status, devices.actuator.002.statusQueue Groups (Load Balancing)
# Multiple subscribers in the same queue group
await nc.subscribe("orders.process", queue="order-processors", cb=message_handler)
await nc.subscribe("orders.process", queue="order-processors", cb=message_handler)
# Each message is delivered to only one subscriber in the groupSubject Structure
NATS uses dot-separated subject names:
sensors.room1.temperature
devices.iot.sensor001.status
orders.eu.new.books
alerts.critical.system.maintenanceBest Practices for Subject Design
- Use Hierarchical Structure:
domain.location.device.measurement - Be Consistent: Use the same pattern across your application
- Keep Subjects Short: Avoid overly long subject names
- Use Descriptive Names: Make subjects self-documenting
Interoperability
NATS → MQTT
Messages published via NATS can be consumed by MQTT clients:
# NATS Publisher
await nc.publish("sensors.temperature", b'{"value": 22.5}')# MQTT Consumer
import paho.mqtt.client as mqtt
def on_message(client, userdata, msg):
print(f"Received: {msg.payload.decode()}")
client = mqtt.Client()
client.on_message = on_message
client.connect("your-namespace.flowmq.io", 1883)
client.subscribe("sensors/temperature")
client.loop_forever()MQTT → NATS
Messages published via MQTT can be consumed by NATS clients:
# MQTT Publisher
import paho.mqtt.client as mqtt
client = mqtt.Client()
client.connect("your-namespace.flowmq.io", 1883)
client.publish("sensors/temperature", '{"value": 22.5}')# NATS Consumer
async def message_handler(msg):
print(f"Received: {msg.data.decode()}")
await nc.subscribe("sensors.temperature", cb=message_handler)NATS → AMQP
NATS messages can be consumed by AMQP clients:
# NATS Publisher
await nc.publish("orders.eu.new", b'{"order_id": "12345", "amount": 99.99}')# AMQP Consumer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('your-namespace.flowmq.io', 5672)
)
channel = connection.channel()
channel.queue_declare(queue='order-processing')
channel.queue_bind(exchange='orders', queue='order-processing', routing_key='eu.new')
def callback(ch, method, properties, body):
print(f"Received: {body}")
channel.basic_consume(queue='order-processing', on_message_callback=callback, auto_ack=True)
channel.start_consuming()NATS → Kafka
NATS messages can be consumed by Kafka clients:
# NATS Publisher
await nc.publish("sensors.temperature", b'{"value": 22.5, "unit": "celsius"}')# Kafka Consumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'sensors.temperature',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='sensor-processors',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Received: {message.value}")Subject Filter Mapping
FlowMQ will automatically map NATS subjects to MQTT-style topic filters:
| NATS Subject | Mapped Topic Filter |
|---|---|
sensors.temperature | sensors/temperature |
orders.eu.new | orders/eu/new |
devices.iot.status | devices/iot/status |
Wildcard Support
NATS wildcards will map to MQTT patterns:
| NATS Subject Pattern | Mapped Topic Filter |
|---|---|
sensors.+.temperature | sensors/+/temperature |
sensors.> | sensors/# |
devices.+.+.status | devices/+/+/status |
Advanced Features
Durable Subscriptions
# Create durable subscription
await nc.subscribe(
"orders.process",
durable="order-processor",
cb=message_handler
)
# Messages are retained for durable subscribersMessage Headers
# Publish with headers
headers = {
"source": "order-service",
"version": "1.0",
"priority": "high"
}
await nc.publish(
"orders.new",
b'{"order_id": "12345"}',
headers=headers
)JetStream (Future Feature)
# Publish to JetStream
js = nc.jetstream()
await js.publish("orders.new", b'{"order_id": "12345"}')
# Subscribe to JetStream
sub = await js.subscribe("orders.new", durable="order-processor")
async for msg in sub.messages:
print(f"Received: {msg.data.decode()}")
await msg.ack()Error Handling
import nats
async def main():
try:
# Connect to FlowMQ
nc = await nats.connect("nats://your-namespace.flowmq.io:4222")
# Subscribe with error handling
async def message_handler(msg):
try:
data = json.loads(msg.data.decode())
# Process message
print(f"Processed: {data}")
except json.JSONDecodeError as e:
print(f"Invalid JSON: {e}")
except Exception as e:
print(f"Processing error: {e}")
await nc.subscribe("sensors.*", cb=message_handler)
# Keep connection alive
await asyncio.sleep(3600) # 1 hour
except nats.errors.ConnectionError as e:
print(f"Connection error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
await nc.close()
# Run
asyncio.run(main())Security
TLS Connection
import ssl
# Connect with TLS
nc = await nats.connect(
"tls://your-namespace.flowmq.io:4222",
tls=ssl.create_default_context()
)Authentication
# Connect with credentials
nc = await nats.connect(
"nats://your-namespace.flowmq.io:4222",
user="username",
password="password"
)
# Connect with token
nc = await nats.connect(
"nats://your-namespace.flowmq.io:4222",
token="your-auth-token"
)Best Practices
Use Appropriate Subject Patterns
- Use wildcards for flexible subscriptions
- Design hierarchical subject structure
- Keep subjects short and descriptive
Handle Request-Reply Properly
- Set appropriate timeouts
- Handle no-response scenarios
- Use correlation IDs for tracking
Use Queue Groups for Load Balancing
- Distribute load across multiple subscribers
- Ensure exactly-once delivery per group
- Monitor group health
Implement Error Handling
- Handle connection failures
- Process message errors gracefully
- Implement retry logic
Monitor Performance
- Track message throughput
- Monitor subscription health
- Watch for memory leaks
Client Libraries
FlowMQ will work with any standard NATS client library:
- Python:
nats-py,asyncio-nats-client - JavaScript:
nats.js,nats.ws - Java:
nats.java,spring-nats - Go:
nats.go - C#:
NATS.Client - Rust:
nats.rs
Connection Parameters
| Parameter | Default | Description |
|---|---|---|
| Host | your-namespace.flowmq.io | FlowMQ service host |
| Port | 4222 | NATS port (TLS: 4222) |
| Max Reconnect | 60 | Maximum reconnection attempts |
| Reconnect Wait | 2s | Time between reconnection attempts |
| Ping Interval | 2m | Ping interval for keep-alive |
Performance Characteristics
- High Throughput: Up to millions of messages per second
- Low Latency: Sub-millisecond message delivery
- Lightweight: Minimal protocol overhead
- Scalable: Horizontal scaling with clustering
Next Steps
- Learn about MQTT Protocol
- Explore AMQP Protocol
- Check Kafka Protocol
Future Roadmap
NATS support in FlowMQ is planned for future releases and will include:
- Core NATS Protocol: Full NATS v2.0 compliance
- JetStream Support: Persistent messaging with streams
- Advanced Security: TLS, authentication, and authorization
- Clustering: High availability and fault tolerance
- Enhanced Interoperability: Seamless bridging with all supported protocols