Skip to content

Connect FlowMQ with AMQP Java Client

FlowMQ's native support for the AMQP 0.9.1 protocol allows Java developers to build robust, message-oriented applications. This guide will walk you through using the official RabbitMQ Java client to connect to FlowMQ, declare exchanges and queues, publish messages, and consume them.

Prerequisites

Before you start, make sure you have the following:

  • A running instance of FlowMQ.
  • A Java Development Kit (JDK) 8 or later.
  • A dependency management tool like Maven or Gradle.

Dependencies

Add the RabbitMQ AMQP client to your project's dependencies.

Maven

Add the following to your pom.xml:

xml
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>

Gradle

Add the following to your build.gradle:

groovy
implementation 'com.rabbitmq:amqp-client:5.14.2'

Connecting to FlowMQ

First, establish a connection to your FlowMQ instance and create a channel. All AMQP operations are done on a channel.

java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class AmqpJavaClient {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // factory.setUsername("guest");
        // factory.setPassword("guest");
        // factory.setPort(5672);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // ... declare, publish, and consume logic here ...
        }
    }
}

Declaring an Exchange and Queue

Messages are published to exchanges, which route them to queues. Here we will declare a queue. For simplicity, we'll use the default (direct) exchange.

java
// Inside the try-with-resources block
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

Publishing a Message

Now you can publish a message. Since we are using the default exchange, the routing key must be the name of the queue.

java
String message = "Hello FlowMQ from Java!";

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");

Consuming Messages

To consume messages, we can register a DeliverCallback to process incoming messages.

java
import com.rabbitmq.client.DeliverCallback;

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};

channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

Full Example

Here is a pair of complete examples: one for publishing and one for consuming.

Publisher (Send.java)

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

Consumer (Recv.java)

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

Additional Resources