Skip to content

NATS Protocol

FlowMQ plans to provide native NATS protocol support in future releases, allowing you to use standard NATS clients and tools to connect, publish, and consume messages. NATS in FlowMQ will map to the unified routing model for high-performance, lightweight messaging with excellent interoperability.

Overview

NATS in FlowMQ will map to the unified routing model where:

  • NATS Subjects map to Topics
  • NATS Subscriptions map to Subscriptions
  • NATS Queues provide load balancing

Connection

Connect to FlowMQ using standard NATS clients:

python
import nats

# Connect to FlowMQ
nc = await nats.connect("nats://your-namespace.flowmq.io:4222")

# With authentication
nc = await nats.connect(
    "nats://your-namespace.flowmq.io:4222",
    user="username",
    password="password"
)

Publishing Messages

Basic Publishing

python
import nats
import json

# Connect
nc = await nats.connect("nats://your-namespace.flowmq.io:4222")

# Publish simple message
await nc.publish("sensors.temperature", b"22.5")

# Publish JSON payload
payload = {
    "temperature": 22.5,
    "humidity": 65,
    "timestamp": "2024-01-15T10:30:00Z"
}
await nc.publish("sensors.room1", json.dumps(payload).encode())

Request-Reply Pattern

python
# Request with timeout
response = await nc.request("orders.process", b'{"order_id": "12345"}', timeout=5.0)
print(f"Response: {response.data.decode()}")

# Handle requests
async def order_handler(msg):
    order_data = json.loads(msg.data.decode())
    # Process order
    result = {"status": "processed", "order_id": order_data["order_id"]}
    await msg.respond(json.dumps(result).encode())

# Subscribe to requests
await nc.subscribe("orders.process", cb=order_handler)

Subscribing to Messages

Basic Subscription

python
async def message_handler(msg):
    print(f"Received: {msg.subject} -> {msg.data.decode()}")
    # Process the message

# Subscribe to specific subject
await nc.subscribe("sensors.temperature", cb=message_handler)

# Subscribe to multiple subjects
await nc.subscribe("sensors.*", cb=message_handler)

Wildcard Subscriptions

python
# Single-level wildcard
await nc.subscribe("sensors.+.temperature", cb=message_handler)
# Matches: sensors.room1.temperature, sensors.room2.temperature

# Multi-level wildcard
await nc.subscribe("sensors.>", cb=message_handler)
# Matches: sensors.temperature, sensors.room1.temperature, sensors.room1.humidity

# Multiple wildcards
await nc.subscribe("devices.+.+.status", cb=message_handler)
# Matches: devices.sensor.001.status, devices.actuator.002.status

Queue Groups (Load Balancing)

python
# Multiple subscribers in the same queue group
await nc.subscribe("orders.process", queue="order-processors", cb=message_handler)
await nc.subscribe("orders.process", queue="order-processors", cb=message_handler)

# Each message is delivered to only one subscriber in the group

Subject Structure

NATS uses dot-separated subject names:

sensors.room1.temperature
devices.iot.sensor001.status
orders.eu.new.books
alerts.critical.system.maintenance

Best Practices for Subject Design

  1. Use Hierarchical Structure: domain.location.device.measurement
  2. Be Consistent: Use the same pattern across your application
  3. Keep Subjects Short: Avoid overly long subject names
  4. Use Descriptive Names: Make subjects self-documenting

Interoperability

NATS → MQTT

Messages published via NATS can be consumed by MQTT clients:

python
# NATS Publisher
await nc.publish("sensors.temperature", b'{"value": 22.5}')
python
# MQTT Consumer
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    print(f"Received: {msg.payload.decode()}")

client = mqtt.Client()
client.on_message = on_message
client.connect("your-namespace.flowmq.io", 1883)
client.subscribe("sensors/temperature")
client.loop_forever()

MQTT → NATS

Messages published via MQTT can be consumed by NATS clients:

python
# MQTT Publisher
import paho.mqtt.client as mqtt

client = mqtt.Client()
client.connect("your-namespace.flowmq.io", 1883)
client.publish("sensors/temperature", '{"value": 22.5}')
python
# NATS Consumer
async def message_handler(msg):
    print(f"Received: {msg.data.decode()}")

await nc.subscribe("sensors.temperature", cb=message_handler)

NATS → AMQP

NATS messages can be consumed by AMQP clients:

python
# NATS Publisher
await nc.publish("orders.eu.new", b'{"order_id": "12345", "amount": 99.99}')
python
# AMQP Consumer
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('your-namespace.flowmq.io', 5672)
)
channel = connection.channel()

channel.queue_declare(queue='order-processing')
channel.queue_bind(exchange='orders', queue='order-processing', routing_key='eu.new')

def callback(ch, method, properties, body):
    print(f"Received: {body}")

channel.basic_consume(queue='order-processing', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

NATS → Kafka

NATS messages can be consumed by Kafka clients:

python
# NATS Publisher
await nc.publish("sensors.temperature", b'{"value": 22.5, "unit": "celsius"}')
python
# Kafka Consumer
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'sensors.temperature',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='sensor-processors',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    print(f"Received: {message.value}")

Subject Filter Mapping

FlowMQ will automatically map NATS subjects to MQTT-style topic filters:

NATS SubjectMapped Topic Filter
sensors.temperaturesensors/temperature
orders.eu.neworders/eu/new
devices.iot.statusdevices/iot/status

Wildcard Support

NATS wildcards will map to MQTT patterns:

NATS Subject PatternMapped Topic Filter
sensors.+.temperaturesensors/+/temperature
sensors.>sensors/#
devices.+.+.statusdevices/+/+/status

Advanced Features

Durable Subscriptions

python
# Create durable subscription
await nc.subscribe(
    "orders.process",
    durable="order-processor",
    cb=message_handler
)

# Messages are retained for durable subscribers

Message Headers

python
# Publish with headers
headers = {
    "source": "order-service",
    "version": "1.0",
    "priority": "high"
}

await nc.publish(
    "orders.new",
    b'{"order_id": "12345"}',
    headers=headers
)

JetStream (Future Feature)

python
# Publish to JetStream
js = nc.jetstream()
await js.publish("orders.new", b'{"order_id": "12345"}')

# Subscribe to JetStream
sub = await js.subscribe("orders.new", durable="order-processor")
async for msg in sub.messages:
    print(f"Received: {msg.data.decode()}")
    await msg.ack()

Error Handling

python
import nats

async def main():
    try:
        # Connect to FlowMQ
        nc = await nats.connect("nats://your-namespace.flowmq.io:4222")
        
        # Subscribe with error handling
        async def message_handler(msg):
            try:
                data = json.loads(msg.data.decode())
                # Process message
                print(f"Processed: {data}")
            except json.JSONDecodeError as e:
                print(f"Invalid JSON: {e}")
            except Exception as e:
                print(f"Processing error: {e}")
        
        await nc.subscribe("sensors.*", cb=message_handler)
        
        # Keep connection alive
        await asyncio.sleep(3600)  # 1 hour
        
    except nats.errors.ConnectionError as e:
        print(f"Connection error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    finally:
        await nc.close()

# Run
asyncio.run(main())

Security

TLS Connection

python
import ssl

# Connect with TLS
nc = await nats.connect(
    "tls://your-namespace.flowmq.io:4222",
    tls=ssl.create_default_context()
)

Authentication

python
# Connect with credentials
nc = await nats.connect(
    "nats://your-namespace.flowmq.io:4222",
    user="username",
    password="password"
)

# Connect with token
nc = await nats.connect(
    "nats://your-namespace.flowmq.io:4222",
    token="your-auth-token"
)

Best Practices

  1. Use Appropriate Subject Patterns

    • Use wildcards for flexible subscriptions
    • Design hierarchical subject structure
    • Keep subjects short and descriptive
  2. Handle Request-Reply Properly

    • Set appropriate timeouts
    • Handle no-response scenarios
    • Use correlation IDs for tracking
  3. Use Queue Groups for Load Balancing

    • Distribute load across multiple subscribers
    • Ensure exactly-once delivery per group
    • Monitor group health
  4. Implement Error Handling

    • Handle connection failures
    • Process message errors gracefully
    • Implement retry logic
  5. Monitor Performance

    • Track message throughput
    • Monitor subscription health
    • Watch for memory leaks

Client Libraries

FlowMQ will work with any standard NATS client library:

  • Python: nats-py, asyncio-nats-client
  • JavaScript: nats.js, nats.ws
  • Java: nats.java, spring-nats
  • Go: nats.go
  • C#: NATS.Client
  • Rust: nats.rs

Connection Parameters

ParameterDefaultDescription
Hostyour-namespace.flowmq.ioFlowMQ service host
Port4222NATS port (TLS: 4222)
Max Reconnect60Maximum reconnection attempts
Reconnect Wait2sTime between reconnection attempts
Ping Interval2mPing interval for keep-alive

Performance Characteristics

  • High Throughput: Up to millions of messages per second
  • Low Latency: Sub-millisecond message delivery
  • Lightweight: Minimal protocol overhead
  • Scalable: Horizontal scaling with clustering

Next Steps

Future Roadmap

NATS support in FlowMQ is planned for future releases and will include:

  • Core NATS Protocol: Full NATS v2.0 compliance
  • JetStream Support: Persistent messaging with streams
  • Advanced Security: TLS, authentication, and authorization
  • Clustering: High availability and fault tolerance
  • Enhanced Interoperability: Seamless bridging with all supported protocols