Connect FlowMQ with Kafka Java SDK
Introduction
FlowMQ natively supports the Apache Kafka protocol, allowing you to leverage your existing Kafka knowledge and applications. This means you can use the standard kafka-clients Java library to interact with FlowMQ as if it were a Kafka cluster. This guide covers the basic setup for connecting, producing, and consuming messages.
Prerequisites
Before you begin, ensure you have the following:
- A FlowMQ instance is running and accessible.
- The FlowMQ Kafka listener endpoint (e.g., your-flowmq-host:9092).
- Java Development Kit (JDK) 8 or newer installed.
- A Maven or Gradle project.
- A basic understanding of Kafka concepts like topics, producers, and consumers.
Adding Kafka Client Dependency
Add the kafka-clients library to your Java project.
Maven:
\<dependency\>
\<groupId\>org.apache.kafka\</groupId\>
\<artifactId\>kafka-clients\</artifactId\>
\<version\>3.6.0\</version\> \<\!-- Or your desired Kafka client version \--\>
\</dependency\>
Gradle:
implementation 'org.apache.kafka:kafka-clients:3.6.0' // Or your desired Kafka client version
Remember to replace 3.6.0 with the specific version of kafka-clients you intend to use, compatible with your FlowMQ instance.
4. Configuration Properties
To connect your Kafka client to FlowMQ, you'll need to set a few essential configuration properties:
import java.util.Properties;
// Common properties for both Producer and Consumer
Properties props \= new Properties();
props.put("bootstrap.servers", "your-flowmq-host:9092"); // Replace with your FlowMQ Kafka endpoint
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // For Producer
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // For Producer
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // For Consumer
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // For Consumer
// Example security configuration (if FlowMQ requires SSL/TLS \- adapt as needed)
// props.put("security.protocol", "SSL");
// props.put("ssl.truststore.location", "/path/to/client.truststore.jks");
// props.put("ssl.truststore.password", "truststore\_password");
// props.put("ssl.keystore.location", "/path/to/client.keystore.jks");
// props.put("ssl.keystore.password", "keystore\_password");
// props.put("ssl.key.password", "key\_password");
- bootstrap.servers: Crucial. Points to your FlowMQ Kafka listener.
- Serializers/Deserializers: Specify how message keys and values are converted to bytes and back. StringSerializer/StringDeserializer are common for text-based messages.
Adjust security properties if your FlowMQ instance uses TLS/SSL or other authentication mechanisms.
Creating a Kafka Topic
Before producing messages to a topic, you need to ensure it exists. FlowMQ might be configured to auto-create topics when a producer first tries to send a message to a non-existent topic. However, it's often good practice to create topics explicitly.
You can use the Kafka AdminClient API for this:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class FlowMQCreateTopic {
public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "your-flowmq-host:9092"); // FlowMQ endpoint
try (AdminClient adminClient = AdminClient.create(props)) {
String topicName = "my-flowmq-topic";
int numPartitions = 1; // Adjust as needed
short replicationFactor = 1; // FlowMQ might handle replication differently; consult FlowMQ docs.
// For standard Kafka, this would be <= number of brokers.
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
// Check if topic already exists (optional, to avoid errors if re-running)
if (!adminClient.listTopics().names().get().contains(topicName)) {
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
System.out.printf("Topic '%s' created successfully.%n", topicName);
} else {
System.out.printf("Topic '%s' already exists.%n", topicName);
}
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error creating topic: " + e.getMessage());
// Handle exceptions, e.g., if topic creation failed or it already exists with different config
if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
System.err.printf("Topic '%s' already exists (caught specific exception).%n", "my-flowmq-topic");
} else {
e.printStackTrace();
}
}
}
}
Explanation:
- Properties: Configure bootstrap.servers for the AdminClient.
- AdminClient: Instantiate the client. It's auto-closeable.
- NewTopic: Define the new topic's name, number of partitions, and replication factor.
- Partitions: Determine the parallelism of consumption.
- Replication Factor: For fault tolerance. FlowMQ might manage this differently than standard Kafka; refer to FlowMQ documentation for best practices on replication. For this example, 1 is used.
- adminClient.createTopics(): Sends the request to create the topic(s). .all().get() waits for the operation to complete.
- Topic Existence Check: The example includes an optional check to see if the topic already exists to prevent errors if the code is run multiple times.
Alternatively, if FlowMQ's Kafka interface supports the kafka-topics.sh command-line tool and your FlowMQ setup allows it, you could also create topics using:
# kafka-topics.sh --bootstrap-server your-flowmq-host:9092 --create --topic my-flowmq-topic --partitions 1 --replication-factor 1
Consult your FlowMQ documentation for the recommended way to manage topics, including auto-creation settings and replication strategies.
Creating a Kafka Producer
Here’s how to create a simple Kafka producer that sends a message to a topic in FlowMQ:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class FlowMQProducer {
public static void main(String\[\] args) {
Properties props \= new Properties();
props.put(ProducerConfig.BOOTSTRAP\_SERVERS\_CONFIG, "your-flowmq-host:9092"); // FlowMQ endpoint
props.put(ProducerConfig.KEY\_SERIALIZER\_CLASS\_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE\_SERIALIZER\_CLASS\_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Add other producer configs like acks, retries if needed
KafkaProducer\<String, String\> producer \= new KafkaProducer\<\>(props);
String topic \= "my-flowmq-topic";
String key \= "message-key-1";
String value \= "Hello from Kafka client to FlowMQ\!";
ProducerRecord\<String, String\> record \= new ProducerRecord\<\>(topic, key, value);
try {
// Asynchronous send with a callback for result handling
Future\<RecordMetadata\> future \= producer.send(record, (metadata, exception) \-\> {
if (exception \== null) {
System.out.printf("Message sent successfully to topic %s, partition %d, offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
System.err.println("Failed to send message: " \+ exception.getMessage());
}
});
future.get(); // Block for simplicity in this example, usually handle asynchronously
} catch (Exception e) {
System.err.println("Error sending message: " \+ e.getMessage());
} finally {
producer.flush(); // Ensure all outstanding messages are sent
producer.close(); // Close the producer
}
}
}
Explanation:
- Properties: Configure bootstrap.servers and serializers.
- KafkaProducer: Instantiate the producer with the properties.
- ProducerRecord: Create the message specifying topic, key, and value.
- producer.send(): Send the message. The example uses an asynchronous send with a callback to handle success or failure. future.get() is used here to wait for the send to complete for simplicity; in real applications, you'd manage these futures more asynchronously.
- producer.close(): Always close the producer to release resources.
6. Creating a Kafka Consumer
Here’s how to create a Kafka consumer to read messages from a topic in FlowMQ:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class FlowMQConsumer {
public static void main(String\[\] args) {
Properties props \= new Properties();
props.put(ConsumerConfig.BOOTSTRAP\_SERVERS\_CONFIG, "your-flowmq-host:9092"); // FlowMQ endpoint
props.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP\_ID\_CONFIG, "my-flowmq-consumer-group"); // Essential for consumer groups
props.put(ConsumerConfig.AUTO\_OFFSET\_RESET\_CONFIG, "earliest"); // Start consuming from the beginning if no offset is stored
// props.put(ConsumerConfig.ENABLE\_AUTO\_COMMIT\_CONFIG, "false"); // For manual commit
KafkaConsumer\<String, String\> consumer \= new KafkaConsumer\<\>(props);
String topic \= "my-flowmq-topic";
consumer.subscribe(Collections.singletonList(topic)); // Subscribe to the topic
System.out.println("Consumer started, polling for messages from topic: " \+ topic);
try {
while (true) { // Continuous polling loop
ConsumerRecords\<String, String\> records \= consumer.poll(Duration.ofMillis(1000)); // Poll for new messages
if (records.isEmpty()) {
// System.out.println("No messages received in this poll.");
continue;
}
for (ConsumerRecord\<String, String\> record : records) {
System.out.printf("Received message: key=%s, value=%s, topic=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.topic(), record.partition(), record.offset());
}
// If using manual commit:
// consumer.commitAsync(); // or consumer.commitSync();
}
} catch (Exception e) {
System.err.println("Error during consumption: " \+ e.getMessage());
} finally {
consumer.close(); // Close the consumer
System.out.println("Consumer closed.");
}
}
}
Explanation:
- Properties: Configure bootstrap.servers, deserializers, and importantly, group.id.
- KafkaConsumer: Instantiate the consumer.
- consumer.subscribe(): Subscribe to one or more topics.
- Polling Loop: Call consumer.poll() repeatedly to fetch messages.
- Processing Records: Iterate over the ConsumerRecords to process each message.
- Offset Committing: By default, offsets are committed automatically. For manual control, set enable.auto.commit to false and use consumer.commitSync() or consumer.commitAsync().
- consumer.close(): Always close the consumer.
7. Running the Examples
- Save the producer and consumer code into .java files (e.g., FlowMQProducer.java, FlowMQConsumer.java).
- Compile them using your JDK: javac -cp path/to/kafka-clients.jar YourFile.java (or use Maven/Gradle build).
- Run the consumer first, then the producer.
- java -cp .:path/to/kafka-clients.jar:path/to/slf4j-api.jar:path/to/slf4j-simple.jar FlowMQConsumer
- java -cp .:path/to/kafka-clients.jar:path/to/slf4j-api.jar:path/to/slf4j-simple.jar FlowMQProducer
(Note: You'll also need SLF4J API and an implementation like slf4j-simple.jar in your classpath for Kafka clients to log correctly. Adjust classpath based on your setup.)
You should see logs indicating the producer sent a message and the consumer received it.
8. Troubleshooting Tips (Very Basic)
- Connection Refused: Double-check the bootstrap.servers address and port. Ensure FlowMQ is running and its Kafka listener is active. Check network connectivity and firewalls.
- Serialization Errors: Ensure the key.serializer/value.serializer in the producer match the key.deserializer/value.deserializer in the consumer, and that they correctly represent your message data types.
- Topic Not Found / Messages Not Received: Verify the topic name is correct. Ensure the topic exists in FlowMQ, or that FlowMQ is configured for automatic topic creation if you expect that behavior. For consumers, check group.id and auto.offset.reset policy.
9. Next Steps / Further Reading
- For more details on FlowMQ's protocol support, see the Chapter: Supported Protocols in FlowMQ (refer to id="flowmq_protocols_chapter_draft" in your FlowMQ documentation).
- For comprehensive information on Kafka Java client configurations and advanced features, consult the official Apache Kafka Documentation.