AWS SQS Protocol
FlowMQ plans to provide native AWS SQS (Simple Queue Service) compatibility in future releases, allowing you to use standard AWS SDKs and tools to connect, send, and receive messages. SQS in FlowMQ will map to queues for reliable, scalable message processing with excellent interoperability.
Overview
SQS in FlowMQ will map to the unified routing model where:
- SQS Queues map to Queues
- SQS Messages provide reliable delivery
- SQS Visibility Timeout ensures message processing
Connection
Connect to FlowMQ using standard AWS SDKs:
import boto3
# Configure SQS client for FlowMQ
sqs = boto3.client(
'sqs',
endpoint_url='https://your-namespace.flowmq.io',
region_name='us-east-1',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Get queue URL
queue_url = 'https://your-namespace.flowmq.io/queues/order-processing'Sending Messages
Basic Message Sending
import boto3
import json
# Configure SQS client
sqs = boto3.client(
'sqs',
endpoint_url='https://your-namespace.flowmq.io',
region_name='us-east-1',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Send message
response = sqs.send_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MessageBody=json.dumps({
'order_id': '12345',
'amount': 99.99,
'customer_id': 'user-123'
}),
MessageAttributes={
'source': {
'StringValue': 'order-service',
'DataType': 'String'
},
'priority': {
'StringValue': 'high',
'DataType': 'String'
}
}
)
print(f"Message sent: {response['MessageId']}")Batch Message Sending
# Send multiple messages in batch
messages = [
{
'Id': 'msg1',
'MessageBody': json.dumps({'order_id': '12345', 'amount': 99.99})
},
{
'Id': 'msg2',
'MessageBody': json.dumps({'order_id': '12346', 'amount': 149.99})
},
{
'Id': 'msg3',
'MessageBody': json.dumps({'order_id': '12347', 'amount': 79.99})
}
]
response = sqs.send_message_batch(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
Entries=messages
)
print(f"Batch sent: {len(response['Successful'])} successful, {len(response['Failed'])} failed")Delayed Messages
# Send message with delay (up to 15 minutes)
response = sqs.send_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MessageBody=json.dumps({'order_id': '12345'}),
DelaySeconds=300 # 5 minutes delay
)Receiving Messages
Basic Message Receiving
# Receive messages
response = sqs.receive_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MaxNumberOfMessages=10,
WaitTimeSeconds=20, # Long polling
VisibilityTimeout=300 # 5 minutes visibility timeout
)
messages = response.get('Messages', [])
for message in messages:
print(f"Received: {message['Body']}")
print(f"Message ID: {message['MessageId']}")
print(f"Receipt Handle: {message['ReceiptHandle']}")
# Process the message
process_order(json.loads(message['Body']))
# Delete the message after processing
sqs.delete_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
ReceiptHandle=message['ReceiptHandle']
)Long Polling
# Use long polling for efficient message receiving
response = sqs.receive_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MaxNumberOfMessages=10,
WaitTimeSeconds=20, # Long polling (up to 20 seconds)
VisibilityTimeout=300
)Message Attributes
# Receive messages with attributes
response = sqs.receive_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MaxNumberOfMessages=10,
MessageAttributeNames=['All'],
AttributeNames=['All']
)
for message in response.get('Messages', []):
print(f"Body: {message['Body']}")
print(f"Attributes: {message.get('MessageAttributes', {})}")
print(f"System Attributes: {message.get('Attributes', {})}")Queue Management
Creating Queues
# Create standard queue
response = sqs.create_queue(
QueueName='order-processing',
Attributes={
'VisibilityTimeout': '300', # 5 minutes
'MessageRetentionPeriod': '1209600', # 14 days
'MaximumMessageSize': '262144', # 256 KB
'DelaySeconds': '0',
'ReceiveMessageWaitTimeSeconds': '20' # Long polling
}
)
queue_url = response['QueueUrl']
print(f"Queue created: {queue_url}")Creating FIFO Queues
# Create FIFO queue for ordered message processing
response = sqs.create_queue(
QueueName='order-processing.fifo',
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true',
'VisibilityTimeout': '300'
}
)Listing Queues
# List all queues
response = sqs.list_queues(
QueueNamePrefix='order' # Optional prefix filter
)
for queue_url in response.get('QueueUrls', []):
print(f"Queue: {queue_url}")Interoperability
SQS → MQTT
Messages sent via SQS can be consumed by MQTT clients:
# SQS Producer
sqs.send_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MessageBody=json.dumps({'order_id': '12345', 'amount': 99.99})
)# MQTT Consumer
import paho.mqtt.client as mqtt
def on_message(client, userdata, msg):
print(f"Received: {msg.payload.decode()}")
client = mqtt.Client()
client.on_message = on_message
client.connect("your-namespace.flowmq.io", 1883)
client.subscribe("queues/order-processing")
client.loop_forever()MQTT → SQS
Messages published via MQTT can be consumed by SQS clients:
# MQTT Publisher
import paho.mqtt.client as mqtt
client = mqtt.Client()
client.connect("your-namespace.flowmq.io", 1883)
client.publish("queues/order-processing", '{"order_id": "12345"}')# SQS Consumer
response = sqs.receive_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
for message in response.get('Messages', []):
print(f"Received: {message['Body']}")
# Process message
sqs.delete_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
ReceiptHandle=message['ReceiptHandle']
)SQS → AMQP
SQS messages can be consumed by AMQP clients:
# SQS Producer
sqs.send_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MessageBody=json.dumps({'order_id': '12345', 'amount': 99.99})
)# AMQP Consumer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('your-namespace.flowmq.io', 5672)
)
channel = connection.channel()
channel.queue_declare(queue='order-processing')
channel.queue_bind(exchange='sqs', queue='order-processing', routing_key='order-processing')
def callback(ch, method, properties, body):
print(f"Received: {body}")
channel.basic_consume(queue='order-processing', on_message_callback=callback, auto_ack=True)
channel.start_consuming()Error Handling
import boto3
from botocore.exceptions import ClientError
try:
# Send message
response = sqs.send_message(
QueueUrl='https://your-namespace.flowmq.io/queues/order-processing',
MessageBody=json.dumps({'order_id': '12345'})
)
print(f"Message sent: {response['MessageId']}")
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'QueueDoesNotExist':
print("Queue does not exist")
elif error_code == 'InvalidMessageContents':
print("Invalid message format")
else:
print(f"SQS error: {e}")Dead Letter Queues
# Create dead letter queue
dlq_response = sqs.create_queue(
QueueName='order-processing-dlq',
Attributes={
'MessageRetentionPeriod': '1209600' # 14 days
}
)
# Create main queue with dead letter queue
response = sqs.create_queue(
QueueName='order-processing',
Attributes={
'VisibilityTimeout': '300',
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_response['QueueUrl'],
'maxReceiveCount': '3'
})
}
)Best Practices
Use Long Polling
- Reduce API calls and costs
- Improve message delivery latency
- Set WaitTimeSeconds to 20 seconds
Handle Message Visibility
- Set appropriate visibility timeout
- Extend timeout for long-running tasks
- Delete messages after successful processing
Use Batch Operations
- Send/receive messages in batches
- Reduce API call overhead
- Improve throughput
Implement Dead Letter Queues
- Handle failed message processing
- Monitor and retry failed messages
- Set appropriate retry limits
Use Message Attributes
- Add metadata to messages
- Filter messages based on attributes
- Track message processing
Client Libraries
FlowMQ will work with any standard AWS SDK:
- Python:
boto3,aioboto3 - JavaScript:
aws-sdk,@aws-sdk/client-sqs - Java:
aws-java-sdk-sqs - Go:
github.com/aws/aws-sdk-go - C#:
AWSSDK.SQS - Rust:
rusoto_sqs
Connection Parameters
| Parameter | Default | Description |
|---|---|---|
| Endpoint URL | https://your-namespace.flowmq.io | FlowMQ SQS endpoint |
| Region | us-east-1 | AWS region (for compatibility) |
| Max Messages | 10 | Maximum messages per receive |
| Wait Time | 20s | Long polling timeout |
| Visibility Timeout | 30s | Message visibility timeout |
Next Steps
- Learn about SNS Protocol
- Explore MQTT Protocol
- Check AMQP Protocol
Future Roadmap
SQS support in FlowMQ is planned for future releases and will include:
- Full SQS Compatibility: Complete AWS SQS API support
- FIFO Queues: Ordered message processing
- Dead Letter Queues: Failed message handling
- Message Attributes: Rich metadata support
- Enhanced Interoperability: Seamless bridging with all supported protocols