AWS SNS Protocol
FlowMQ plans to provide native AWS SNS (Simple Notification Service) compatibility in future releases, allowing you to use standard AWS SDKs and tools to connect, publish, and subscribe to messages. SNS in FlowMQ will map to topics for pub/sub messaging with excellent interoperability.
Overview
SNS in FlowMQ will map to the unified routing model where:
- SNS Topics map to Topics
- SNS Subscriptions map to Subscriptions
- SNS Message Filtering provides flexible routing
Connection
Connect to FlowMQ using standard AWS SDKs:
import boto3
# Configure SNS client for FlowMQ
sns = boto3.client(
'sns',
endpoint_url='https://your-namespace.flowmq.io',
region_name='us-east-1',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)Publishing Messages
Basic Message Publishing
import boto3
import json
# Configure SNS client
sns = boto3.client(
'sns',
endpoint_url='https://your-namespace.flowmq.io',
region_name='us-east-1',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Publish message to topic
response = sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
Message=json.dumps({
'order_id': '12345',
'amount': 99.99,
'customer_id': 'user-123'
}),
Subject='Order Created',
MessageAttributes={
'source': {
'DataType': 'String',
'StringValue': 'order-service'
},
'priority': {
'DataType': 'String',
'StringValue': 'high'
}
}
)
print(f"Message published: {response['MessageId']}")Publishing to Multiple Topics
# Publish to multiple topics
topics = [
'arn:aws:sns:us-east-1:123456789012:order-events',
'arn:aws:sns:us-east-1:123456789012:notifications'
]
for topic_arn in topics:
response = sns.publish(
TopicArn=topic_arn,
Message=json.dumps({'order_id': '12345', 'status': 'created'}),
Subject='Order Status Update'
)
print(f"Published to {topic_arn}: {response['MessageId']}")Message Filtering
# Publish with message filtering
response = sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
Message=json.dumps({'order_id': '12345', 'region': 'eu', 'amount': 99.99}),
MessageAttributes={
'region': {
'DataType': 'String',
'StringValue': 'eu'
},
'amount': {
'DataType': 'Number',
'StringValue': '99.99'
}
}
)Topic Management
Creating Topics
# Create SNS topic
response = sns.create_topic(
Name='order-events',
Attributes={
'DisplayName': 'Order Events Topic',
'Policy': json.dumps({
'Version': '2012-10-17',
'Statement': [
{
'Effect': 'Allow',
'Principal': {'Service': 'sns.amazonaws.com'},
'Action': 'SNS:Publish',
'Resource': '*'
}
]
})
}
)
topic_arn = response['TopicArn']
print(f"Topic created: {topic_arn}")Listing Topics
# List all topics
response = sns.list_topics()
for topic in response['Topics']:
print(f"Topic ARN: {topic['TopicArn']}")Deleting Topics
# Delete topic
sns.delete_topic(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events'
)Subscriptions
Creating Subscriptions
# Create subscription to SQS queue
response = sns.subscribe(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
Protocol='sqs',
Endpoint='https://your-namespace.flowmq.io/queues/order-processing',
Attributes={
'FilterPolicy': json.dumps({
'region': ['eu', 'us'],
'amount': [{'numeric': ['>', 50]}]
})
}
)
subscription_arn = response['SubscriptionArn']
print(f"Subscription created: {subscription_arn}")HTTP/HTTPS Subscriptions
# Create HTTP subscription
response = sns.subscribe(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
Protocol='https',
Endpoint='https://your-api.com/webhooks/orders',
Attributes={
'FilterPolicy': json.dumps({
'priority': ['high']
})
}
)Email Subscriptions
# Create email subscription
response = sns.subscribe(
TopicArn='arn:aws:sns:us-east-1:123456789012:notifications',
Protocol='email',
Endpoint='admin@yourcompany.com'
)Interoperability
SNS → MQTT
Messages published via SNS can be consumed by MQTT clients:
# SNS Publisher
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:sensor-data',
Message=json.dumps({'temperature': 22.5, 'humidity': 65}),
Subject='Sensor Reading'
)# MQTT Consumer
import paho.mqtt.client as mqtt
def on_message(client, userdata, msg):
print(f"Received: {msg.payload.decode()}")
client = mqtt.Client()
client.on_message = on_message
client.connect("your-namespace.flowmq.io", 1883)
client.subscribe("sns/sensor-data")
client.loop_forever()MQTT → SNS
Messages published via MQTT can be consumed by SNS subscribers:
# MQTT Publisher
import paho.mqtt.client as mqtt
client = mqtt.Client()
client.connect("your-namespace.flowmq.io", 1883)
client.publish("sns/order-events", '{"order_id": "12345", "status": "created"}')# SNS Consumer (via SQS)
import boto3
sqs = boto3.client(
'sqs',
endpoint_url='https://your-namespace.flowmq.io',
region_name='us-east-1'
)
response = sqs.receive_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
for message in response.get('Messages', []):
print(f"Received: {message['Body']}")
# Process SNS messageSNS → AMQP
SNS messages can be consumed by AMQP clients:
# SNS Publisher
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
Message=json.dumps({'order_id': '12345', 'amount': 99.99}),
Subject='Order Created'
)# AMQP Consumer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('your-namespace.flowmq.io', 5672)
)
channel = connection.channel()
channel.queue_declare(queue='order-processing')
channel.queue_bind(exchange='sns', queue='order-processing', routing_key='order-events')
def callback(ch, method, properties, body):
print(f"Received: {body}")
channel.basic_consume(queue='order-processing', on_message_callback=callback, auto_ack=True)
channel.start_consuming()SNS → Kafka
SNS messages can be consumed by Kafka clients:
# SNS Publisher
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:user-events',
Message=json.dumps({'user_id': '123', 'action': 'login'}),
Subject='User Activity'
)# Kafka Consumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'sns.user-events',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='user-event-processors',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Received: {message.value}")Message Filtering
Attribute-Based Filtering
# Publish with attributes for filtering
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
Message=json.dumps({'order_id': '12345', 'amount': 99.99}),
MessageAttributes={
'region': {
'DataType': 'String',
'StringValue': 'eu'
},
'priority': {
'DataType': 'String',
'StringValue': 'high'
},
'amount': {
'DataType': 'Number',
'StringValue': '99.99'
}
}
)
# Subscribe with filter policy
sns.subscribe(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
Protocol='sqs',
Endpoint='https://your-namespace.flowmq.io/queues/high-priority-orders',
Attributes={
'FilterPolicy': json.dumps({
'priority': ['high'],
'amount': [{'numeric': ['>', 50]}]
})
}
)Error Handling
import boto3
from botocore.exceptions import ClientError
try:
# Publish message
response = sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
Message=json.dumps({'order_id': '12345'}),
Subject='Order Created'
)
print(f"Message published: {response['MessageId']}")
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'TopicNotFound':
print("Topic does not exist")
elif error_code == 'InvalidParameter':
print("Invalid message format")
else:
print(f"SNS error: {e}")Best Practices
Use Message Attributes
- Add metadata for filtering
- Enable targeted subscriptions
- Improve message routing
Implement Message Filtering
- Use filter policies for subscriptions
- Reduce unnecessary message delivery
- Improve system efficiency
Handle Message Delivery
- Monitor delivery status
- Implement retry logic
- Use dead letter queues for failed messages
Use Appropriate Protocols
- SQS for reliable processing
- HTTP/HTTPS for webhooks
- Email for notifications
- Lambda for serverless processing
Monitor and Log
- Track message delivery metrics
- Monitor subscription health
- Log message processing
Client Libraries
FlowMQ will work with any standard AWS SDK:
- Python:
boto3,aioboto3 - JavaScript:
aws-sdk,@aws-sdk/client-sns - Java:
aws-java-sdk-sns - Go:
github.com/aws/aws-sdk-go - C#:
AWSSDK.SNS - Rust:
rusoto_sns
Connection Parameters
| Parameter | Default | Description |
|---|---|---|
| Endpoint URL | https://your-namespace.flowmq.io | FlowMQ SNS endpoint |
| Region | us-east-1 | AWS region (for compatibility) |
| Protocol | https | Default subscription protocol |
| Message Size | 256KB | Maximum message size |
| Topic Limit | 100,000 | Maximum topics per account |
Next Steps
- Learn about SQS Protocol
- Explore MQTT Protocol
- Check AMQP Protocol
Future Roadmap
SNS support in FlowMQ is planned for future releases and will include:
- Full SNS Compatibility: Complete AWS SNS API support
- Message Filtering: Advanced attribute-based filtering
- Multiple Protocols: SQS, HTTP/HTTPS, Email, Lambda
- Fan-out Delivery: One-to-many message distribution
- Enhanced Interoperability: Seamless bridging with all supported protocols