Skip to content

AWS SNS Protocol

FlowMQ plans to provide native AWS SNS (Simple Notification Service) compatibility in future releases, allowing you to use standard AWS SDKs and tools to connect, publish, and subscribe to messages. SNS in FlowMQ will map to topics for pub/sub messaging with excellent interoperability.

Overview

SNS in FlowMQ will map to the unified routing model where:

  • SNS Topics map to Topics
  • SNS Subscriptions map to Subscriptions
  • SNS Message Filtering provides flexible routing

Connection

Connect to FlowMQ using standard AWS SDKs:

python
import boto3

# Configure SNS client for FlowMQ
sns = boto3.client(
    'sns',
    endpoint_url='https://your-namespace.flowmq.io',
    region_name='us-east-1',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

Publishing Messages

Basic Message Publishing

python
import boto3
import json

# Configure SNS client
sns = boto3.client(
    'sns',
    endpoint_url='https://your-namespace.flowmq.io',
    region_name='us-east-1',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

# Publish message to topic
response = sns.publish(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
    Message=json.dumps({
        'order_id': '12345',
        'amount': 99.99,
        'customer_id': 'user-123'
    }),
    Subject='Order Created',
    MessageAttributes={
        'source': {
            'DataType': 'String',
            'StringValue': 'order-service'
        },
        'priority': {
            'DataType': 'String',
            'StringValue': 'high'
        }
    }
)

print(f"Message published: {response['MessageId']}")

Publishing to Multiple Topics

python
# Publish to multiple topics
topics = [
    'arn:aws:sns:us-east-1:123456789012:order-events',
    'arn:aws:sns:us-east-1:123456789012:notifications'
]

for topic_arn in topics:
    response = sns.publish(
        TopicArn=topic_arn,
        Message=json.dumps({'order_id': '12345', 'status': 'created'}),
        Subject='Order Status Update'
    )
    print(f"Published to {topic_arn}: {response['MessageId']}")

Message Filtering

python
# Publish with message filtering
response = sns.publish(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
    Message=json.dumps({'order_id': '12345', 'region': 'eu', 'amount': 99.99}),
    MessageAttributes={
        'region': {
            'DataType': 'String',
            'StringValue': 'eu'
        },
        'amount': {
            'DataType': 'Number',
            'StringValue': '99.99'
        }
    }
)

Topic Management

Creating Topics

python
# Create SNS topic
response = sns.create_topic(
    Name='order-events',
    Attributes={
        'DisplayName': 'Order Events Topic',
        'Policy': json.dumps({
            'Version': '2012-10-17',
            'Statement': [
                {
                    'Effect': 'Allow',
                    'Principal': {'Service': 'sns.amazonaws.com'},
                    'Action': 'SNS:Publish',
                    'Resource': '*'
                }
            ]
        })
    }
)

topic_arn = response['TopicArn']
print(f"Topic created: {topic_arn}")

Listing Topics

python
# List all topics
response = sns.list_topics()

for topic in response['Topics']:
    print(f"Topic ARN: {topic['TopicArn']}")

Deleting Topics

python
# Delete topic
sns.delete_topic(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events'
)

Subscriptions

Creating Subscriptions

python
# Create subscription to SQS queue
response = sns.subscribe(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
    Protocol='sqs',
    Endpoint='https://your-namespace.flowmq.io/queues/order-processing',
    Attributes={
        'FilterPolicy': json.dumps({
            'region': ['eu', 'us'],
            'amount': [{'numeric': ['>', 50]}]
        })
    }
)

subscription_arn = response['SubscriptionArn']
print(f"Subscription created: {subscription_arn}")

HTTP/HTTPS Subscriptions

python
# Create HTTP subscription
response = sns.subscribe(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
    Protocol='https',
    Endpoint='https://your-api.com/webhooks/orders',
    Attributes={
        'FilterPolicy': json.dumps({
            'priority': ['high']
        })
    }
)

Email Subscriptions

python
# Create email subscription
response = sns.subscribe(
    TopicArn='arn:aws:sns:us-east-1:123456789012:notifications',
    Protocol='email',
    Endpoint='admin@yourcompany.com'
)

Interoperability

SNS → MQTT

Messages published via SNS can be consumed by MQTT clients:

python
# SNS Publisher
sns.publish(
    TopicArn='arn:aws:sns:us-east-1:123456789012:sensor-data',
    Message=json.dumps({'temperature': 22.5, 'humidity': 65}),
    Subject='Sensor Reading'
)
python
# MQTT Consumer
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    print(f"Received: {msg.payload.decode()}")

client = mqtt.Client()
client.on_message = on_message
client.connect("your-namespace.flowmq.io", 1883)
client.subscribe("sns/sensor-data")
client.loop_forever()

MQTT → SNS

Messages published via MQTT can be consumed by SNS subscribers:

python
# MQTT Publisher
import paho.mqtt.client as mqtt

client = mqtt.Client()
client.connect("your-namespace.flowmq.io", 1883)
client.publish("sns/order-events", '{"order_id": "12345", "status": "created"}')
python
# SNS Consumer (via SQS)
import boto3

sqs = boto3.client(
    'sqs',
    endpoint_url='https://your-namespace.flowmq.io',
    region_name='us-east-1'
)

response = sqs.receive_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20
)

for message in response.get('Messages', []):
    print(f"Received: {message['Body']}")
    # Process SNS message

SNS → AMQP

SNS messages can be consumed by AMQP clients:

python
# SNS Publisher
sns.publish(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
    Message=json.dumps({'order_id': '12345', 'amount': 99.99}),
    Subject='Order Created'
)
python
# AMQP Consumer
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('your-namespace.flowmq.io', 5672)
)
channel = connection.channel()

channel.queue_declare(queue='order-processing')
channel.queue_bind(exchange='sns', queue='order-processing', routing_key='order-events')

def callback(ch, method, properties, body):
    print(f"Received: {body}")

channel.basic_consume(queue='order-processing', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

SNS → Kafka

SNS messages can be consumed by Kafka clients:

python
# SNS Publisher
sns.publish(
    TopicArn='arn:aws:sns:us-east-1:123456789012:user-events',
    Message=json.dumps({'user_id': '123', 'action': 'login'}),
    Subject='User Activity'
)
python
# Kafka Consumer
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'sns.user-events',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='user-event-processors',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    print(f"Received: {message.value}")

Message Filtering

Attribute-Based Filtering

python
# Publish with attributes for filtering
sns.publish(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
    Message=json.dumps({'order_id': '12345', 'amount': 99.99}),
    MessageAttributes={
        'region': {
            'DataType': 'String',
            'StringValue': 'eu'
        },
        'priority': {
            'DataType': 'String',
            'StringValue': 'high'
        },
        'amount': {
            'DataType': 'Number',
            'StringValue': '99.99'
        }
    }
)

# Subscribe with filter policy
sns.subscribe(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
    Protocol='sqs',
    Endpoint='https://your-namespace.flowmq.io/queues/high-priority-orders',
    Attributes={
        'FilterPolicy': json.dumps({
            'priority': ['high'],
            'amount': [{'numeric': ['>', 50]}]
        })
    }
)

Error Handling

python
import boto3
from botocore.exceptions import ClientError

try:
    # Publish message
    response = sns.publish(
        TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
        Message=json.dumps({'order_id': '12345'}),
        Subject='Order Created'
    )
    print(f"Message published: {response['MessageId']}")
    
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'TopicNotFound':
        print("Topic does not exist")
    elif error_code == 'InvalidParameter':
        print("Invalid message format")
    else:
        print(f"SNS error: {e}")

Best Practices

  1. Use Message Attributes

    • Add metadata for filtering
    • Enable targeted subscriptions
    • Improve message routing
  2. Implement Message Filtering

    • Use filter policies for subscriptions
    • Reduce unnecessary message delivery
    • Improve system efficiency
  3. Handle Message Delivery

    • Monitor delivery status
    • Implement retry logic
    • Use dead letter queues for failed messages
  4. Use Appropriate Protocols

    • SQS for reliable processing
    • HTTP/HTTPS for webhooks
    • Email for notifications
    • Lambda for serverless processing
  5. Monitor and Log

    • Track message delivery metrics
    • Monitor subscription health
    • Log message processing

Client Libraries

FlowMQ will work with any standard AWS SDK:

  • Python: boto3, aioboto3
  • JavaScript: aws-sdk, @aws-sdk/client-sns
  • Java: aws-java-sdk-sns
  • Go: github.com/aws/aws-sdk-go
  • C#: AWSSDK.SNS
  • Rust: rusoto_sns

Connection Parameters

ParameterDefaultDescription
Endpoint URLhttps://your-namespace.flowmq.ioFlowMQ SNS endpoint
Regionus-east-1AWS region (for compatibility)
ProtocolhttpsDefault subscription protocol
Message Size256KBMaximum message size
Topic Limit100,000Maximum topics per account

Next Steps

Future Roadmap

SNS support in FlowMQ is planned for future releases and will include:

  • Full SNS Compatibility: Complete AWS SNS API support
  • Message Filtering: Advanced attribute-based filtering
  • Multiple Protocols: SQS, HTTP/HTTPS, Email, Lambda
  • Fan-out Delivery: One-to-many message distribution
  • Enhanced Interoperability: Seamless bridging with all supported protocols