Connect FlowMQ with AMQP Java Client
FlowMQ's native support for the AMQP 0.9.1 protocol allows Java developers to build robust, message-oriented applications. This guide will walk you through using the official RabbitMQ Java client to connect to FlowMQ, declare exchanges and queues, publish messages, and consume them.
Prerequisites
Before you start, make sure you have the following:
- A running instance of FlowMQ.
- A Java Development Kit (JDK) 8 or later.
- A dependency management tool like Maven or Gradle.
Dependencies
Add the RabbitMQ AMQP client to your project's dependencies.
Maven
Add the following to your pom.xml
:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
Gradle
Add the following to your build.gradle
:
implementation 'com.rabbitmq:amqp-client:5.14.2'
Connecting to FlowMQ
First, establish a connection to your FlowMQ instance and create a channel. All AMQP operations are done on a channel.
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class AmqpJavaClient {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// factory.setUsername("guest");
// factory.setPassword("guest");
// factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// ... declare, publish, and consume logic here ...
}
}
}
Declaring an Exchange and Queue
Messages are published to exchanges, which route them to queues. Here we will declare a queue. For simplicity, we'll use the default (direct) exchange.
// Inside the try-with-resources block
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Publishing a Message
Now you can publish a message. Since we are using the default exchange, the routing key must be the name of the queue.
String message = "Hello FlowMQ from Java!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
Consuming Messages
To consume messages, we can register a DeliverCallback
to process incoming messages.
import com.rabbitmq.client.DeliverCallback;
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
Full Example
Here is a pair of complete examples: one for publishing and one for consuming.
Publisher (Send.java
)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
Consumer (Recv.java
)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
Additional Resources
- For more advanced scenarios, refer to the official RabbitMQ Java Client documentation.
- Explore FlowMQ's advanced features for AMQP integration.