Connect FlowMQ with the Java SDK
The Java bindings of Flow SDK are built to be robust, high-performance, and integrate seamlessly with modern Java's asynchronous programming features using CompletableFuture
.
Prerequisites
- Java 8+: The SDK leverages features like
CompletableFuture
and lambdas. - Maven or Gradle: For dependency management.
- Running FlowMQ Instance: You need access to a FlowMQ broker. For this guide, we'll assume it's running at
localhost:7070
.
Installation
The Java Flow SDK is available from Maven Central.
Maven
Add the following dependency to your pom.xml
:
<dependency>
<groupId>com.flowmq</groupId>
<artifactId>flowmq-sdk</artifactId>
<version>LATEST_VERSION</version>
</dependency>
Replace LATEST_VERSION
with the latest version number.
Gradle
Add the following to your build.gradle
file:
implementation 'com.flowmq:flowmq-sdk:LATEST_VERSION'
Core Concepts
FlowClient
: The main entry point for interacting with FlowMQ. It's a thread-safe, heavyweight object designed to be created once and shared. It manages the underlying gRPC connection pool.FlowClientConfig
: A configuration object, created with a builder pattern, to set up theFlowClient
.Authentication
: A helper class for providing credentials.Producer
: An object created from theFlowClient
for publishing messages.Consumer
: An object responsible for receiving messages from queues or streams.Message
: A data object representing a message, containing a payload (byte[]
) and headers (Map<String, String>
).Queue
&Stream
: Objects that define the source you want to consume messages from.CompletableFuture
: Used extensively for all asynchronous network operations.
Establishing a Connection
A FlowClient
is created using a FlowClientConfig
. It's crucial to manage its lifecycle correctly, as it holds network resources. The FlowClient
implements AutoCloseable
, so using a try-with-resources
block is the recommended approach.
Configuration and Creation
import com.flowmq.sdk.FlowClient;
import com.flowmq.sdk.FlowClientConfig;
import com.flowmq.sdk.auth.Authentication;
public class ConnectionExample {
public static void main(String[] args) {
FlowClientConfig config = FlowClientConfig.builder()
.brokerAddress("localhost:7070")
.authentication(Authentication.token("YOUR_API_TOKEN"))
.build();
try (FlowClient client = FlowClient.create(config)) {
System.out.println("Successfully connected to FlowMQ!");
// Your application logic (publishing/consuming) goes here.
} catch (Exception e) {
e.printStackTrace();
}
}
}
Replace YOUR_API_TOKEN
with your actual credentials.
Publishing Messages
To publish messages, you create a Producer
from the client. Publishing is an asynchronous operation that returns a CompletableFuture<PublishReceipt>
.
Example: Publishing a Message to a Topic
import com.flowmq.sdk.*;
import com.flowmq.sdk.auth.Authentication;
import java.util.concurrent.CompletableFuture;
public class PublishExample {
public static void main(String[] args) throws Exception {
FlowClientConfig config = FlowClientConfig.builder()
.brokerAddress("localhost:7070")
.authentication(Authentication.token("YOUR_API_TOKEN"))
.build();
try (FlowClient client = FlowClient.create(config)) {
// 1. Create a producer
Producer producer = client.newProducer();
// 2. Construct a message
Message message = Message.builder()
.payload("Hello from Java!".getBytes())
.header("content-type", "text/plain")
.build();
// 3. Publish asynchronously
CompletableFuture<PublishReceipt> future = producer.publishAsync("sensor.v1.data", message);
future.whenComplete((receipt, ex) -> {
if (ex != null) {
System.err.println("Failed to publish message: " + ex.getMessage());
} else {
System.out.println("Message published successfully! ID: " + receipt.getMessageId());
}
});
// Block for the example to finish
future.join();
}
}
}
Consuming Messages
You can consume messages from either a Queue or a Stream by providing a message handler as a lambda.
Consuming from a Queue
When consuming from a queue, you must explicitly acknowledge (ack()
) each message after successful processing. This signals FlowMQ to remove the message from the queue.
import com.flowmq.sdk.*;
import com.flowmq.sdk.auth.Authentication;
import java.util.concurrent.CountDownLatch;
public class QueueConsumerExample {
public static void main(String[] args) throws InterruptedException {
FlowClientConfig config = FlowClientConfig.builder()
.brokerAddress("localhost:7070")
.authentication(Authentication.token("YOUR_API_TOKEN"))
.build();
// Define the queue to consume from
Queue taskQueue = new Queue("image.processing.tasks");
// Use a latch to keep the main thread alive for the example
CountDownLatch latch = new CountDownLatch(1);
try (FlowClient client = FlowClient.create(config)) {
System.out.println("Starting consumer on queue: " + taskQueue.getName() + "...");
// The consume method runs the handler in the background
client.consume(taskQueue, (message) -> {
System.out.println("Received from queue: " + new String(message.getPayload()));
// Process the message...
System.out.println("Processing complete. Acknowledging message.");
message.ack();
latch.countDown(); // Release latch for this example
});
latch.await(); // Wait for a message to be processed
}
}
}
Consuming from a Stream
When consuming from a stream, you commit offsets to track your progress. The StartPosition
determines where the consumer begins reading in the stream.
import com.flowmq.sdk.*;
import com.flowmq.sdk.auth.Authentication;
import com.flowmq.sdk.streams.Stream;
import com.flowmq.sdk.streams.StartPosition;
import java.util.concurrent.CountDownLatch;
public class StreamConsumerExample {
public static void main(String[] args) throws InterruptedException {
FlowClientConfig config = FlowClientConfig.builder()
.brokerAddress("localhost:7070")
.authentication(Authentication.token("YOUR_API_TOKEN"))
.build();
// Define the stream and where to start consuming
Stream eventStream = new Stream("user.activity.events", StartPosition.EARLIEST);
CountDownLatch latch = new CountDownLatch(1);
try (FlowClient client = FlowClient.create(config)) {
System.out.println("Starting consumer on stream: " + eventStream.getName() + "...");
client.consume(eventStream, (message) -> {
System.out.println("Received from stream (offset: " + message.getOffset() + "): " + new String(message.getPayload()));
// Process the message...
System.out.println("Processing complete. Committing offset.");
message.commitOffset();
latch.countDown();
});
latch.await();
}
}
}
Full End-to-End Example
This example combines publishing and consuming to demonstrate a complete workflow.
import com.flowmq.sdk.*;
import com.flowmq.sdk.auth.Authentication;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class FullFlowExample {
public static void main(String[] args) throws Exception {
FlowClientConfig config = FlowClientConfig.builder()
.brokerAddress("localhost:7070")
.authentication(Authentication.token("YOUR_API_TOKEN"))
.build();
try (FlowClient client = FlowClient.create(config)) {
// 1. Consumer Setup (run in background)
// (Assumes 'e2e.topic' is routed to 'e2e.queue' in FlowMQ)
Queue e2eQueue = new Queue("e2e.queue");
CountDownLatch messageReceivedLatch = new CountDownLatch(1);
client.consume(e2eQueue, (message) -> {
System.out.println("Received message: " + new String(message.getPayload()));
message.ack();
messageReceivedLatch.countDown();
});
System.out.println("Consumer started and waiting for messages...");
// 2. Producer: Publish a message
Producer producer = client.newProducer();
Message message = Message.builder().payload("End-to-End Test Message!".getBytes()).build();
producer.publishAsync("e2e.topic", message).whenComplete((receipt, ex) -> {
if (ex != null) {
System.err.println("Publish failed: " + ex.getMessage());
} else {
System.out.println("Message published successfully. ID: " + receipt.getMessageId());
}
}).join(); // Block until publish is complete for simplicity
// 3. Wait for consumer to receive the message
if (messageReceivedLatch.await(10, TimeUnit.SECONDS)) {
System.out.println("Successfully received and acknowledged message.");
} else {
System.err.println("Timed out waiting for message.");
}
}
}
}
Additional Resources
- Error Handling: Chain
.exceptionally()
calls onto yourCompletableFuture
s to gracefully handle failures. - Advanced Configuration: Explore the
FlowClientConfig.builder()
for options like connection timeouts, request timeouts, and TLS settings. - Flow SDK Overview: For a broader understanding of the SDK's architecture and concepts, refer to the Flow SDK Developer Guide.