Skip to content

REST API Reference

FlowMQ provides a comprehensive REST API for managing your messaging infrastructure. This API allows you to create and manage namespaces, topics, queues, streams, subscriptions, and messages programmatically.

Base URL

All API endpoints are relative to your FlowMQ service URL:

https://{namespace}.flowmq.io/api/v1

Authentication

The FlowMQ REST API uses API key authentication. Include your API key in the Authorization header:

Authorization: Bearer YOUR_API_KEY

Response Format

All API responses are returned in JSON format with the following structure:

json
{
  "success": true,
  "data": {
    // Response data
  },
  "error": null
}

Error responses follow this format:

json
{
  "success": false,
  "data": null,
  "error": {
    "code": "ERROR_CODE",
    "message": "Human-readable error message",
    "details": {}
  }
}

HTTP Status Codes

  • 200 OK - Request successful
  • 201 Created - Resource created successfully
  • 400 Bad Request - Invalid request parameters
  • 401 Unauthorized - Invalid or missing API key
  • 403 Forbidden - Insufficient permissions
  • 404 Not Found - Resource not found
  • 409 Conflict - Resource already exists
  • 422 Unprocessable Entity - Validation error
  • 500 Internal Server Error - Server error

Namespaces

Get Namespace Information

Retrieve information about your current namespace.

http
GET /namespace

Response:

json
{
  "success": true,
  "data": {
    "name": "my-namespace",
    "endpoint": "https://my-namespace.flowmq.io",
    "created_at": "2024-01-15T10:30:00Z",
    "status": "active"
  },
  "error": null
}

Topics

Publish Message

Publish a message to a specific topic.

http
POST /topics/{topic_name}/messages

Path Parameters:

  • topic_name (string, required) - The topic name (e.g., orders/eu/new)

Request Body:

json
{
  "payload": "Hello, FlowMQ!",
  "content_type": "text/plain",
  "properties": {
    "user_id": "12345",
    "priority": "high"
  },
  "partition_key": "user-12345"
}

Request Fields:

  • payload (string, required) - The message content
  • content_type (string, optional) - MIME type of the payload (default: application/json)
  • properties (object, optional) - Custom message properties
  • partition_key (string, optional) - Key for partitioning messages

Response:

json
{
  "success": true,
  "data": {
    "message_id": "msg_abc123",
    "topic": "orders/eu/new",
    "timestamp": "2024-01-15T10:30:00Z",
    "size": 256
  },
  "error": null
}

Publish Batch Messages

Publish multiple messages to a topic in a single request.

http
POST /topics/{topic_name}/messages/batch

Request Body:

json
{
  "messages": [
    {
      "payload": "Message 1",
      "properties": {"batch_id": "batch-1"}
    },
    {
      "payload": "Message 2",
      "properties": {"batch_id": "batch-1"}
    }
  ]
}

Response:

json
{
  "success": true,
  "data": {
    "published_count": 2,
    "message_ids": ["msg_abc123", "msg_def456"],
    "timestamp": "2024-01-15T10:30:00Z"
  },
  "error": null
}

Queues

Create Queue

Create a new queue with topic binding.

http
POST /queues

Request Body:

json
{
  "name": "order-processing-queue",
  "topic_filter": "orders/#",
  "max_size_mb": 1024,
  "message_ttl_seconds": 86400,
  "max_retries": 3,
  "dead_letter_topic": "orders/dead-letter"
}

Request Fields:

  • name (string, required) - Unique queue name
  • topic_filter (string, required) - Topic filter pattern (e.g., orders/#, sensors/+/temperature)
  • max_size_mb (integer, optional) - Maximum queue size in MB (default: 1024)
  • message_ttl_seconds (integer, optional) - Message time-to-live in seconds (default: 86400)
  • max_retries (integer, optional) - Maximum retry attempts for failed messages (default: 3)
  • dead_letter_topic (string, optional) - Topic for dead letter messages

Response:

json
{
  "success": true,
  "data": {
    "name": "order-processing-queue",
    "topic_filter": "orders/#",
    "status": "active",
    "created_at": "2024-01-15T10:30:00Z",
    "message_count": 0,
    "consumer_count": 0
  },
  "error": null
}

List Queues

Retrieve all queues in the namespace.

http
GET /queues

Query Parameters:

  • limit (integer, optional) - Maximum number of queues to return (default: 50, max: 100)
  • offset (integer, optional) - Number of queues to skip (default: 0)

Response:

json
{
  "success": true,
  "data": {
    "queues": [
      {
        "name": "order-processing-queue",
        "topic_filter": "orders/#",
        "status": "active",
        "created_at": "2024-01-15T10:30:00Z",
        "message_count": 42,
        "consumer_count": 3
      }
    ],
    "total": 1,
    "limit": 50,
    "offset": 0
  },
  "error": null
}

Get Queue

Retrieve information about a specific queue.

http
GET /queues/{queue_name}

Response:

json
{
  "success": true,
  "data": {
    "name": "order-processing-queue",
    "topic_filter": "orders/#",
    "status": "active",
    "created_at": "2024-01-15T10:30:00Z",
    "message_count": 42,
    "consumer_count": 3,
    "max_size_mb": 1024,
    "message_ttl_seconds": 86400,
    "max_retries": 3,
    "dead_letter_topic": "orders/dead-letter"
  },
  "error": null
}

Delete Queue

Delete a queue and all its messages.

http
DELETE /queues/{queue_name}

Response:

json
{
  "success": true,
  "data": {
    "deleted_at": "2024-01-15T10:30:00Z"
  },
  "error": null
}

Consume Messages

Consume messages from a queue.

http
POST /queues/{queue_name}/messages/consume

Request Body:

json
{
  "max_messages": 10,
  "timeout_seconds": 30,
  "visibility_timeout_seconds": 300
}

Request Fields:

  • max_messages (integer, optional) - Maximum number of messages to consume (default: 1, max: 100)
  • timeout_seconds (integer, optional) - Long polling timeout (default: 30)
  • visibility_timeout_seconds (integer, optional) - Message visibility timeout (default: 300)

Response:

json
{
  "success": true,
  "data": {
    "messages": [
      {
        "message_id": "msg_abc123",
        "payload": "Order #12345",
        "content_type": "application/json",
        "properties": {
          "user_id": "12345",
          "priority": "high"
        },
        "publish_time": "2024-01-15T10:30:00Z",
        "receive_count": 1,
        "receipt_handle": "receipt_handle_xyz789"
      }
    ],
    "count": 1
  },
  "error": null
}

Acknowledge Message

Acknowledge successful processing of a message.

http
POST /queues/{queue_name}/messages/{message_id}/ack

Request Body:

json
{
  "receipt_handle": "receipt_handle_xyz789"
}

Response:

json
{
  "success": true,
  "data": {
    "acknowledged_at": "2024-01-15T10:30:00Z"
  },
  "error": null
}

Streams

Create Stream

Create a new stream with topic binding.

http
POST /streams

Request Body:

json
{
  "name": "order-events-stream",
  "topic_filter": "orders/#",
  "retention_days": 30,
  "max_size_gb": 100,
  "partition_count": 3
}

Request Fields:

  • name (string, required) - Unique stream name
  • topic_filter (string, required) - Topic filter pattern
  • retention_days (integer, optional) - Message retention period in days (default: 7)
  • max_size_gb (integer, optional) - Maximum stream size in GB (default: 10)
  • partition_count (integer, optional) - Number of partitions (default: 1)

Response:

json
{
  "success": true,
  "data": {
    "name": "order-events-stream",
    "topic_filter": "orders/#",
    "status": "active",
    "created_at": "2024-01-15T10:30:00Z",
    "message_count": 0,
    "consumer_count": 0,
    "retention_days": 30,
    "max_size_gb": 100,
    "partition_count": 3
  },
  "error": null
}

List Streams

Retrieve all streams in the namespace.

http
GET /streams

Query Parameters:

  • limit (integer, optional) - Maximum number of streams to return (default: 50, max: 100)
  • offset (integer, optional) - Number of streams to skip (default: 0)

Response:

json
{
  "success": true,
  "data": {
    "streams": [
      {
        "name": "order-events-stream",
        "topic_filter": "orders/#",
        "status": "active",
        "created_at": "2024-01-15T10:30:00Z",
        "message_count": 1250,
        "consumer_count": 2
      }
    ],
    "total": 1,
    "limit": 50,
    "offset": 0
  },
  "error": null
}

Get Stream

Retrieve information about a specific stream.

http
GET /streams/{stream_name}

Response:

json
{
  "success": true,
  "data": {
    "name": "order-events-stream",
    "topic_filter": "orders/#",
    "status": "active",
    "created_at": "2024-01-15T10:30:00Z",
    "message_count": 1250,
    "consumer_count": 2,
    "retention_days": 30,
    "max_size_gb": 100,
    "partition_count": 3,
    "earliest_offset": 0,
    "latest_offset": 1249
  },
  "error": null
}

Delete Stream

Delete a stream and all its messages.

http
DELETE /streams/{stream_name}

Response:

json
{
  "success": true,
  "data": {
    "deleted_at": "2024-01-15T10:30:00Z"
  },
  "error": null
}

Consume from Stream

Consume messages from a stream.

http
POST /streams/{stream_name}/messages/consume

Request Body:

json
{
  "consumer_group": "analytics-group",
  "start_offset": "latest",
  "max_messages": 100,
  "timeout_seconds": 30
}

Request Fields:

  • consumer_group (string, required) - Consumer group name
  • start_offset (string, optional) - Starting offset (earliest, latest, or specific offset)
  • max_messages (integer, optional) - Maximum number of messages to consume (default: 100, max: 1000)
  • timeout_seconds (integer, optional) - Long polling timeout (default: 30)

Response:

json
{
  "success": true,
  "data": {
    "messages": [
      {
        "offset": 1247,
        "partition": 0,
        "payload": "Order #12345",
        "content_type": "application/json",
        "properties": {
          "user_id": "12345",
          "priority": "high"
        },
        "publish_time": "2024-01-15T10:30:00Z"
      }
    ],
    "next_offset": 1248,
    "count": 1
  },
  "error": null
}

Commit Offset

Commit the current offset for a consumer group.

http
POST /streams/{stream_name}/offsets/commit

Request Body:

json
{
  "consumer_group": "analytics-group",
  "offset": 1248
}

Response:

json
{
  "success": true,
  "data": {
    "committed_at": "2024-01-15T10:30:00Z"
  },
  "error": null
}

Subscriptions

Create Subscription

Create a new subscription for pub/sub messaging.

http
POST /subscriptions

Request Body:

json
{
  "name": "order-notifications",
  "topic_filter": "orders/#",
  "max_size_mb": 512,
  "message_ttl_seconds": 3600
}

Request Fields:

  • name (string, required) - Unique subscription name
  • topic_filter (string, required) - Topic filter pattern
  • max_size_mb (integer, optional) - Maximum subscription size in MB (default: 512)
  • message_ttl_seconds (integer, optional) - Message time-to-live in seconds (default: 3600)

Response:

json
{
  "success": true,
  "data": {
    "name": "order-notifications",
    "topic_filter": "orders/#",
    "status": "active",
    "created_at": "2024-01-15T10:30:00Z",
    "message_count": 0,
    "subscriber_count": 0
  },
  "error": null
}

List Subscriptions

Retrieve all subscriptions in the namespace.

http
GET /subscriptions

Query Parameters:

  • limit (integer, optional) - Maximum number of subscriptions to return (default: 50, max: 100)
  • offset (integer, optional) - Number of subscriptions to skip (default: 0)

Response:

json
{
  "success": true,
  "data": {
    "subscriptions": [
      {
        "name": "order-notifications",
        "topic_filter": "orders/#",
        "status": "active",
        "created_at": "2024-01-15T10:30:00Z",
        "message_count": 15,
        "subscriber_count": 3
      }
    ],
    "total": 1,
    "limit": 50,
    "offset": 0
  },
  "error": null
}

Get Subscription

Retrieve information about a specific subscription.

http
GET /subscriptions/{subscription_name}

Response:

json
{
  "success": true,
  "data": {
    "name": "order-notifications",
    "topic_filter": "orders/#",
    "status": "active",
    "created_at": "2024-01-15T10:30:00Z",
    "message_count": 15,
    "subscriber_count": 3,
    "max_size_mb": 512,
    "message_ttl_seconds": 3600
  },
  "error": null
}

Delete Subscription

Delete a subscription.

http
DELETE /subscriptions/{subscription_name}

Response:

json
{
  "success": true,
  "data": {
    "deleted_at": "2024-01-15T10:30:00Z"
  },
  "error": null
}

Consume from Subscription

Consume messages from a subscription.

http
POST /subscriptions/{subscription_name}/messages/consume

Request Body:

json
{
  "subscriber_id": "notification-service-1",
  "max_messages": 10,
  "timeout_seconds": 30
}

Request Fields:

  • subscriber_id (string, required) - Unique subscriber identifier
  • max_messages (integer, optional) - Maximum number of messages to consume (default: 10, max: 100)
  • timeout_seconds (integer, optional) - Long polling timeout (default: 30)

Response:

json
{
  "success": true,
  "data": {
    "messages": [
      {
        "message_id": "msg_abc123",
        "payload": "Order #12345",
        "content_type": "application/json",
        "properties": {
          "user_id": "12345",
          "priority": "high"
        },
        "publish_time": "2024-01-15T10:30:00Z"
      }
    ],
    "count": 1
  },
  "error": null
}

Error Codes

CodeDescription
INVALID_API_KEYInvalid or missing API key
NAMESPACE_NOT_FOUNDNamespace does not exist
QUEUE_ALREADY_EXISTSQueue with the same name already exists
STREAM_ALREADY_EXISTSStream with the same name already exists
SUBSCRIPTION_ALREADY_EXISTSSubscription with the same name already exists
QUEUE_NOT_FOUNDQueue does not exist
STREAM_NOT_FOUNDStream does not exist
SUBSCRIPTION_NOT_FOUNDSubscription does not exist
TOPIC_NOT_FOUNDTopic does not exist
INVALID_TOPIC_FILTERInvalid topic filter syntax
MESSAGE_TOO_LARGEMessage payload exceeds size limit
RATE_LIMIT_EXCEEDEDAPI rate limit exceeded
INSUFFICIENT_PERMISSIONSInsufficient permissions for the operation
INTERNAL_ERRORInternal server error

Rate Limits

The FlowMQ REST API implements rate limiting to ensure fair usage:

  • Publish operations: 1000 requests per minute per namespace
  • Consume operations: 500 requests per minute per namespace
  • Management operations: 100 requests per minute per namespace

Rate limit headers are included in responses:

X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 950
X-RateLimit-Reset: 1642248600

SDKs and Libraries

For easier integration, FlowMQ provides official SDKs and libraries:


Examples

Complete Workflow Example

Here's a complete example of creating a queue, publishing messages, and consuming them:

bash
# 1. Create a queue
curl -X POST https://my-namespace.flowmq.io/api/v1/queues \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "order-processing",
    "topic_filter": "orders/#"
  }'

# 2. Publish a message
curl -X POST https://my-namespace.flowmq.io/api/v1/topics/orders/eu/new/messages \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "payload": "{\"order_id\": \"12345\", \"amount\": 99.99}",
    "content_type": "application/json",
    "properties": {
      "user_id": "12345",
      "priority": "high"
    }
  }'

# 3. Consume messages
curl -X POST https://my-namespace.flowmq.io/api/v1/queues/order-processing/messages/consume \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "max_messages": 10,
    "timeout_seconds": 30
  }'

# 4. Acknowledge message
curl -X POST https://my-namespace.flowmq.io/api/v1/queues/order-processing/messages/msg_abc123/ack \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "receipt_handle": "receipt_handle_xyz789"
  }'

This REST API provides full programmatic access to all FlowMQ features, enabling you to build powerful messaging applications and integrations.