Skip to content

AWS SQS Protocol

FlowMQ plans to provide native AWS SQS (Simple Queue Service) compatibility in future releases, allowing you to use standard AWS SDKs and tools to connect, send, and receive messages. SQS in FlowMQ will map to queues for reliable, scalable message processing with excellent interoperability.

Overview

SQS in FlowMQ will map to the unified routing model where:

  • SQS Queues map to Queues
  • SQS Messages provide reliable delivery
  • SQS Visibility Timeout ensures message processing

Connection

Connect to FlowMQ using standard AWS SDKs:

python
import boto3

# Configure SQS client for FlowMQ
sqs = boto3.client(
    'sqs',
    endpoint_url='https://your-namespace.flowmq.io',
    region_name='us-east-1',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

# Get queue URL
queue_url = 'https://your-namespace.flowmq.io/queues/order-processing'

Sending Messages

Basic Message Sending

python
import boto3
import json

# Configure SQS client
sqs = boto3.client(
    'sqs',
    endpoint_url='https://your-namespace.flowmq.io',
    region_name='us-east-1',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

# Send message
response = sqs.send_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MessageBody=json.dumps({
        'order_id': '12345',
        'amount': 99.99,
        'customer_id': 'user-123'
    }),
    MessageAttributes={
        'source': {
            'StringValue': 'order-service',
            'DataType': 'String'
        },
        'priority': {
            'StringValue': 'high',
            'DataType': 'String'
        }
    }
)

print(f"Message sent: {response['MessageId']}")

Batch Message Sending

python
# Send multiple messages in batch
messages = [
    {
        'Id': 'msg1',
        'MessageBody': json.dumps({'order_id': '12345', 'amount': 99.99})
    },
    {
        'Id': 'msg2',
        'MessageBody': json.dumps({'order_id': '12346', 'amount': 149.99})
    },
    {
        'Id': 'msg3',
        'MessageBody': json.dumps({'order_id': '12347', 'amount': 79.99})
    }
]

response = sqs.send_message_batch(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    Entries=messages
)

print(f"Batch sent: {len(response['Successful'])} successful, {len(response['Failed'])} failed")

Delayed Messages

python
# Send message with delay (up to 15 minutes)
response = sqs.send_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MessageBody=json.dumps({'order_id': '12345'}),
    DelaySeconds=300  # 5 minutes delay
)

Receiving Messages

Basic Message Receiving

python
# Receive messages
response = sqs.receive_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20,  # Long polling
    VisibilityTimeout=300  # 5 minutes visibility timeout
)

messages = response.get('Messages', [])
for message in messages:
    print(f"Received: {message['Body']}")
    print(f"Message ID: {message['MessageId']}")
    print(f"Receipt Handle: {message['ReceiptHandle']}")
    
    # Process the message
    process_order(json.loads(message['Body']))
    
    # Delete the message after processing
    sqs.delete_message(
        QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
        ReceiptHandle=message['ReceiptHandle']
    )

Long Polling

python
# Use long polling for efficient message receiving
response = sqs.receive_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20,  # Long polling (up to 20 seconds)
    VisibilityTimeout=300
)

Message Attributes

python
# Receive messages with attributes
response = sqs.receive_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MaxNumberOfMessages=10,
    MessageAttributeNames=['All'],
    AttributeNames=['All']
)

for message in response.get('Messages', []):
    print(f"Body: {message['Body']}")
    print(f"Attributes: {message.get('MessageAttributes', {})}")
    print(f"System Attributes: {message.get('Attributes', {})}")

Queue Management

Creating Queues

python
# Create standard queue
response = sqs.create_queue(
    QueueName='order-processing',
    Attributes={
        'VisibilityTimeout': '300',  # 5 minutes
        'MessageRetentionPeriod': '1209600',  # 14 days
        'MaximumMessageSize': '262144',  # 256 KB
        'DelaySeconds': '0',
        'ReceiveMessageWaitTimeSeconds': '20'  # Long polling
    }
)

queue_url = response['QueueUrl']
print(f"Queue created: {queue_url}")

Creating FIFO Queues

python
# Create FIFO queue for ordered message processing
response = sqs.create_queue(
    QueueName='order-processing.fifo',
    Attributes={
        'FifoQueue': 'true',
        'ContentBasedDeduplication': 'true',
        'VisibilityTimeout': '300'
    }
)

Listing Queues

python
# List all queues
response = sqs.list_queues(
    QueueNamePrefix='order'  # Optional prefix filter
)

for queue_url in response.get('QueueUrls', []):
    print(f"Queue: {queue_url}")

Interoperability

SQS → MQTT

Messages sent via SQS can be consumed by MQTT clients:

python
# SQS Producer
sqs.send_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MessageBody=json.dumps({'order_id': '12345', 'amount': 99.99})
)
python
# MQTT Consumer
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    print(f"Received: {msg.payload.decode()}")

client = mqtt.Client()
client.on_message = on_message
client.connect("your-namespace.flowmq.io", 1883)
client.subscribe("queues/order-processing")
client.loop_forever()

MQTT → SQS

Messages published via MQTT can be consumed by SQS clients:

python
# MQTT Publisher
import paho.mqtt.client as mqtt

client = mqtt.Client()
client.connect("your-namespace.flowmq.io", 1883)
client.publish("queues/order-processing", '{"order_id": "12345"}')
python
# SQS Consumer
response = sqs.receive_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20
)

for message in response.get('Messages', []):
    print(f"Received: {message['Body']}")
    # Process message
    sqs.delete_message(
        QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
        ReceiptHandle=message['ReceiptHandle']
    )

SQS → AMQP

SQS messages can be consumed by AMQP clients:

python
# SQS Producer
sqs.send_message(
    QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
    MessageBody=json.dumps({'order_id': '12345', 'amount': 99.99})
)
python
# AMQP Consumer
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('your-namespace.flowmq.io', 5672)
)
channel = connection.channel()

channel.queue_declare(queue='order-processing')
channel.queue_bind(exchange='sqs', queue='order-processing', routing_key='order-processing')

def callback(ch, method, properties, body):
    print(f"Received: {body}")

channel.basic_consume(queue='order-processing', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Error Handling

python
import boto3
from botocore.exceptions import ClientError

try:
    # Send message
    response = sqs.send_message(
        QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
        MessageBody=json.dumps({'order_id': '12345'})
    )
    print(f"Message sent: {response['MessageId']}")
    
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'QueueDoesNotExist':
        print("Queue does not exist")
    elif error_code == 'InvalidMessageContents':
        print("Invalid message format")
    else:
        print(f"SQS error: {e}")

Dead Letter Queues

python
# Create dead letter queue
dlq_response = sqs.create_queue(
    QueueName='order-processing-dlq',
    Attributes={
        'MessageRetentionPeriod': '1209600'  # 14 days
    }
)

# Create main queue with dead letter queue
response = sqs.create_queue(
    QueueName='order-processing',
    Attributes={
        'VisibilityTimeout': '300',
        'RedrivePolicy': json.dumps({
            'deadLetterTargetArn': dlq_response['QueueUrl'],
            'maxReceiveCount': '3'
        })
    }
)

Best Practices

  1. Use Long Polling

    • Reduce API calls and costs
    • Improve message delivery latency
    • Set WaitTimeSeconds to 20 seconds
  2. Handle Message Visibility

    • Set appropriate visibility timeout
    • Extend timeout for long-running tasks
    • Delete messages after successful processing
  3. Use Batch Operations

    • Send/receive messages in batches
    • Reduce API call overhead
    • Improve throughput
  4. Implement Dead Letter Queues

    • Handle failed message processing
    • Monitor and retry failed messages
    • Set appropriate retry limits
  5. Use Message Attributes

    • Add metadata to messages
    • Filter messages based on attributes
    • Track message processing

Client Libraries

FlowMQ will work with any standard AWS SDK:

  • Python: boto3, aioboto3
  • JavaScript: aws-sdk, @aws-sdk/client-sqs
  • Java: aws-java-sdk-sqs
  • Go: github.com/aws/aws-sdk-go
  • C#: AWSSDK.SQS
  • Rust: rusoto_sqs

Connection Parameters

ParameterDefaultDescription
Endpoint URLhttps://your-namespace.flowmq.ioFlowMQ SQS endpoint
Regionus-east-1AWS region (for compatibility)
Max Messages10Maximum messages per receive
Wait Time20sLong polling timeout
Visibility Timeout30sMessage visibility timeout

Next Steps

Future Roadmap

SQS support in FlowMQ is planned for future releases and will include:

  • Full SQS Compatibility: Complete AWS SQS API support
  • FIFO Queues: Ordered message processing
  • Dead Letter Queues: Failed message handling
  • Message Attributes: Rich metadata support
  • Enhanced Interoperability: Seamless bridging with all supported protocols