REST API Reference
FlowMQ provides a comprehensive REST API for managing your messaging infrastructure. This API allows you to create and manage namespaces, topics, queues, streams, subscriptions, and messages programmatically.
Base URL
All API endpoints are relative to your FlowMQ service URL:
https://{namespace}.flowmq.io/api/v1Authentication
The FlowMQ REST API uses API key authentication. Include your API key in the Authorization header:
Authorization: Bearer YOUR_API_KEYResponse Format
All API responses are returned in JSON format with the following structure:
{
"success": true,
"data": {
// Response data
},
"error": null
}Error responses follow this format:
{
"success": false,
"data": null,
"error": {
"code": "ERROR_CODE",
"message": "Human-readable error message",
"details": {}
}
}HTTP Status Codes
200 OK- Request successful201 Created- Resource created successfully400 Bad Request- Invalid request parameters401 Unauthorized- Invalid or missing API key403 Forbidden- Insufficient permissions404 Not Found- Resource not found409 Conflict- Resource already exists422 Unprocessable Entity- Validation error500 Internal Server Error- Server error
Namespaces
Get Namespace Information
Retrieve information about your current namespace.
GET /namespaceResponse:
{
"success": true,
"data": {
"name": "my-namespace",
"endpoint": "https://my-namespace.flowmq.io",
"created_at": "2024-01-15T10:30:00Z",
"status": "active"
},
"error": null
}Topics
Publish Message
Publish a message to a specific topic.
POST /topics/{topic_name}/messagesPath Parameters:
topic_name(string, required) - The topic name (e.g.,orders/eu/new)
Request Body:
{
"payload": "Hello, FlowMQ!",
"content_type": "text/plain",
"properties": {
"user_id": "12345",
"priority": "high"
},
"partition_key": "user-12345"
}Request Fields:
payload(string, required) - The message contentcontent_type(string, optional) - MIME type of the payload (default:application/json)properties(object, optional) - Custom message propertiespartition_key(string, optional) - Key for partitioning messages
Response:
{
"success": true,
"data": {
"message_id": "msg_abc123",
"topic": "orders/eu/new",
"timestamp": "2024-01-15T10:30:00Z",
"size": 256
},
"error": null
}Publish Batch Messages
Publish multiple messages to a topic in a single request.
POST /topics/{topic_name}/messages/batchRequest Body:
{
"messages": [
{
"payload": "Message 1",
"properties": {"batch_id": "batch-1"}
},
{
"payload": "Message 2",
"properties": {"batch_id": "batch-1"}
}
]
}Response:
{
"success": true,
"data": {
"published_count": 2,
"message_ids": ["msg_abc123", "msg_def456"],
"timestamp": "2024-01-15T10:30:00Z"
},
"error": null
}Queues
Create Queue
Create a new queue with topic binding.
POST /queuesRequest Body:
{
"name": "order-processing-queue",
"topic_filter": "orders/#",
"max_size_mb": 1024,
"message_ttl_seconds": 86400,
"max_retries": 3,
"dead_letter_topic": "orders/dead-letter"
}Request Fields:
name(string, required) - Unique queue nametopic_filter(string, required) - Topic filter pattern (e.g.,orders/#,sensors/+/temperature)max_size_mb(integer, optional) - Maximum queue size in MB (default: 1024)message_ttl_seconds(integer, optional) - Message time-to-live in seconds (default: 86400)max_retries(integer, optional) - Maximum retry attempts for failed messages (default: 3)dead_letter_topic(string, optional) - Topic for dead letter messages
Response:
{
"success": true,
"data": {
"name": "order-processing-queue",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 0,
"consumer_count": 0
},
"error": null
}List Queues
Retrieve all queues in the namespace.
GET /queuesQuery Parameters:
limit(integer, optional) - Maximum number of queues to return (default: 50, max: 100)offset(integer, optional) - Number of queues to skip (default: 0)
Response:
{
"success": true,
"data": {
"queues": [
{
"name": "order-processing-queue",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 42,
"consumer_count": 3
}
],
"total": 1,
"limit": 50,
"offset": 0
},
"error": null
}Get Queue
Retrieve information about a specific queue.
GET /queues/{queue_name}Response:
{
"success": true,
"data": {
"name": "order-processing-queue",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 42,
"consumer_count": 3,
"max_size_mb": 1024,
"message_ttl_seconds": 86400,
"max_retries": 3,
"dead_letter_topic": "orders/dead-letter"
},
"error": null
}Delete Queue
Delete a queue and all its messages.
DELETE /queues/{queue_name}Response:
{
"success": true,
"data": {
"deleted_at": "2024-01-15T10:30:00Z"
},
"error": null
}Consume Messages
Consume messages from a queue.
POST /queues/{queue_name}/messages/consumeRequest Body:
{
"max_messages": 10,
"timeout_seconds": 30,
"visibility_timeout_seconds": 300
}Request Fields:
max_messages(integer, optional) - Maximum number of messages to consume (default: 1, max: 100)timeout_seconds(integer, optional) - Long polling timeout (default: 30)visibility_timeout_seconds(integer, optional) - Message visibility timeout (default: 300)
Response:
{
"success": true,
"data": {
"messages": [
{
"message_id": "msg_abc123",
"payload": "Order #12345",
"content_type": "application/json",
"properties": {
"user_id": "12345",
"priority": "high"
},
"publish_time": "2024-01-15T10:30:00Z",
"receive_count": 1,
"receipt_handle": "receipt_handle_xyz789"
}
],
"count": 1
},
"error": null
}Acknowledge Message
Acknowledge successful processing of a message.
POST /queues/{queue_name}/messages/{message_id}/ackRequest Body:
{
"receipt_handle": "receipt_handle_xyz789"
}Response:
{
"success": true,
"data": {
"acknowledged_at": "2024-01-15T10:30:00Z"
},
"error": null
}Streams
Create Stream
Create a new stream with topic binding.
POST /streamsRequest Body:
{
"name": "order-events-stream",
"topic_filter": "orders/#",
"retention_days": 30,
"max_size_gb": 100,
"partition_count": 3
}Request Fields:
name(string, required) - Unique stream nametopic_filter(string, required) - Topic filter patternretention_days(integer, optional) - Message retention period in days (default: 7)max_size_gb(integer, optional) - Maximum stream size in GB (default: 10)partition_count(integer, optional) - Number of partitions (default: 1)
Response:
{
"success": true,
"data": {
"name": "order-events-stream",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 0,
"consumer_count": 0,
"retention_days": 30,
"max_size_gb": 100,
"partition_count": 3
},
"error": null
}List Streams
Retrieve all streams in the namespace.
GET /streamsQuery Parameters:
limit(integer, optional) - Maximum number of streams to return (default: 50, max: 100)offset(integer, optional) - Number of streams to skip (default: 0)
Response:
{
"success": true,
"data": {
"streams": [
{
"name": "order-events-stream",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 1250,
"consumer_count": 2
}
],
"total": 1,
"limit": 50,
"offset": 0
},
"error": null
}Get Stream
Retrieve information about a specific stream.
GET /streams/{stream_name}Response:
{
"success": true,
"data": {
"name": "order-events-stream",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 1250,
"consumer_count": 2,
"retention_days": 30,
"max_size_gb": 100,
"partition_count": 3,
"earliest_offset": 0,
"latest_offset": 1249
},
"error": null
}Delete Stream
Delete a stream and all its messages.
DELETE /streams/{stream_name}Response:
{
"success": true,
"data": {
"deleted_at": "2024-01-15T10:30:00Z"
},
"error": null
}Consume from Stream
Consume messages from a stream.
POST /streams/{stream_name}/messages/consumeRequest Body:
{
"consumer_group": "analytics-group",
"start_offset": "latest",
"max_messages": 100,
"timeout_seconds": 30
}Request Fields:
consumer_group(string, required) - Consumer group namestart_offset(string, optional) - Starting offset (earliest,latest, or specific offset)max_messages(integer, optional) - Maximum number of messages to consume (default: 100, max: 1000)timeout_seconds(integer, optional) - Long polling timeout (default: 30)
Response:
{
"success": true,
"data": {
"messages": [
{
"offset": 1247,
"partition": 0,
"payload": "Order #12345",
"content_type": "application/json",
"properties": {
"user_id": "12345",
"priority": "high"
},
"publish_time": "2024-01-15T10:30:00Z"
}
],
"next_offset": 1248,
"count": 1
},
"error": null
}Commit Offset
Commit the current offset for a consumer group.
POST /streams/{stream_name}/offsets/commitRequest Body:
{
"consumer_group": "analytics-group",
"offset": 1248
}Response:
{
"success": true,
"data": {
"committed_at": "2024-01-15T10:30:00Z"
},
"error": null
}Subscriptions
Create Subscription
Create a new subscription for pub/sub messaging.
POST /subscriptionsRequest Body:
{
"name": "order-notifications",
"topic_filter": "orders/#",
"max_size_mb": 512,
"message_ttl_seconds": 3600
}Request Fields:
name(string, required) - Unique subscription nametopic_filter(string, required) - Topic filter patternmax_size_mb(integer, optional) - Maximum subscription size in MB (default: 512)message_ttl_seconds(integer, optional) - Message time-to-live in seconds (default: 3600)
Response:
{
"success": true,
"data": {
"name": "order-notifications",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 0,
"subscriber_count": 0
},
"error": null
}List Subscriptions
Retrieve all subscriptions in the namespace.
GET /subscriptionsQuery Parameters:
limit(integer, optional) - Maximum number of subscriptions to return (default: 50, max: 100)offset(integer, optional) - Number of subscriptions to skip (default: 0)
Response:
{
"success": true,
"data": {
"subscriptions": [
{
"name": "order-notifications",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 15,
"subscriber_count": 3
}
],
"total": 1,
"limit": 50,
"offset": 0
},
"error": null
}Get Subscription
Retrieve information about a specific subscription.
GET /subscriptions/{subscription_name}Response:
{
"success": true,
"data": {
"name": "order-notifications",
"topic_filter": "orders/#",
"status": "active",
"created_at": "2024-01-15T10:30:00Z",
"message_count": 15,
"subscriber_count": 3,
"max_size_mb": 512,
"message_ttl_seconds": 3600
},
"error": null
}Delete Subscription
Delete a subscription.
DELETE /subscriptions/{subscription_name}Response:
{
"success": true,
"data": {
"deleted_at": "2024-01-15T10:30:00Z"
},
"error": null
}Consume from Subscription
Consume messages from a subscription.
POST /subscriptions/{subscription_name}/messages/consumeRequest Body:
{
"subscriber_id": "notification-service-1",
"max_messages": 10,
"timeout_seconds": 30
}Request Fields:
subscriber_id(string, required) - Unique subscriber identifiermax_messages(integer, optional) - Maximum number of messages to consume (default: 10, max: 100)timeout_seconds(integer, optional) - Long polling timeout (default: 30)
Response:
{
"success": true,
"data": {
"messages": [
{
"message_id": "msg_abc123",
"payload": "Order #12345",
"content_type": "application/json",
"properties": {
"user_id": "12345",
"priority": "high"
},
"publish_time": "2024-01-15T10:30:00Z"
}
],
"count": 1
},
"error": null
}Error Codes
| Code | Description |
|---|---|
INVALID_API_KEY | Invalid or missing API key |
NAMESPACE_NOT_FOUND | Namespace does not exist |
QUEUE_ALREADY_EXISTS | Queue with the same name already exists |
STREAM_ALREADY_EXISTS | Stream with the same name already exists |
SUBSCRIPTION_ALREADY_EXISTS | Subscription with the same name already exists |
QUEUE_NOT_FOUND | Queue does not exist |
STREAM_NOT_FOUND | Stream does not exist |
SUBSCRIPTION_NOT_FOUND | Subscription does not exist |
TOPIC_NOT_FOUND | Topic does not exist |
INVALID_TOPIC_FILTER | Invalid topic filter syntax |
MESSAGE_TOO_LARGE | Message payload exceeds size limit |
RATE_LIMIT_EXCEEDED | API rate limit exceeded |
INSUFFICIENT_PERMISSIONS | Insufficient permissions for the operation |
INTERNAL_ERROR | Internal server error |
Rate Limits
The FlowMQ REST API implements rate limiting to ensure fair usage:
- Publish operations: 1000 requests per minute per namespace
- Consume operations: 500 requests per minute per namespace
- Management operations: 100 requests per minute per namespace
Rate limit headers are included in responses:
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 950
X-RateLimit-Reset: 1642248600SDKs and Libraries
For easier integration, FlowMQ provides official SDKs and libraries:
Examples
Complete Workflow Example
Here's a complete example of creating a queue, publishing messages, and consuming them:
# 1. Create a queue
curl -X POST https://my-namespace.flowmq.io/api/v1/queues \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "order-processing",
"topic_filter": "orders/#"
}'
# 2. Publish a message
curl -X POST https://my-namespace.flowmq.io/api/v1/topics/orders/eu/new/messages \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"payload": "{\"order_id\": \"12345\", \"amount\": 99.99}",
"content_type": "application/json",
"properties": {
"user_id": "12345",
"priority": "high"
}
}'
# 3. Consume messages
curl -X POST https://my-namespace.flowmq.io/api/v1/queues/order-processing/messages/consume \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"max_messages": 10,
"timeout_seconds": 30
}'
# 4. Acknowledge message
curl -X POST https://my-namespace.flowmq.io/api/v1/queues/order-processing/messages/msg_abc123/ack \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"receipt_handle": "receipt_handle_xyz789"
}'This REST API provides full programmatic access to all FlowMQ features, enabling you to build powerful messaging applications and integrations.