Skip to content

Connect FlowMQ with the Java SDK

The Java bindings of Flow SDK are built to be robust, high-performance, and integrate seamlessly with modern Java's asynchronous programming features using CompletableFuture.

Prerequisites

  • Java 8+: The SDK leverages features like CompletableFuture and lambdas.
  • Maven or Gradle: For dependency management.
  • Running FlowMQ Instance: You need access to a FlowMQ broker. For this guide, we'll assume it's running at localhost:7070.

Installation

The Java Flow SDK is available from Maven Central.

Maven

Add the following dependency to your pom.xml:

xml
<dependency>
    <groupId>com.flowmq</groupId>
    <artifactId>flowmq-sdk</artifactId>
    <version>LATEST_VERSION</version>
</dependency>

Replace LATEST_VERSION with the latest version number.

Gradle

Add the following to your build.gradle file:

groovy
implementation 'com.flowmq:flowmq-sdk:LATEST_VERSION'

Core Concepts

  • FlowClient: The main entry point for interacting with FlowMQ. It's a thread-safe, heavyweight object designed to be created once and shared. It manages the underlying gRPC connection pool.
  • FlowClientConfig: A configuration object, created with a builder pattern, to set up the FlowClient.
  • Authentication: A helper class for providing credentials.
  • Producer: An object created from the FlowClient for publishing messages.
  • Consumer: An object responsible for receiving messages from queues or streams.
  • Message: A data object representing a message, containing a payload (byte[]) and headers (Map<String, String>).
  • Queue & Stream: Objects that define the source you want to consume messages from.
  • CompletableFuture: Used extensively for all asynchronous network operations.

Establishing a Connection

A FlowClient is created using a FlowClientConfig. It's crucial to manage its lifecycle correctly, as it holds network resources. The FlowClient implements AutoCloseable, so using a try-with-resources block is the recommended approach.

Configuration and Creation

java
import com.flowmq.sdk.FlowClient;
import com.flowmq.sdk.FlowClientConfig;
import com.flowmq.sdk.auth.Authentication;

public class ConnectionExample {
    public static void main(String[] args) {
        FlowClientConfig config = FlowClientConfig.builder()
            .brokerAddress("localhost:7070")
            .authentication(Authentication.token("YOUR_API_TOKEN"))
            .build();

        try (FlowClient client = FlowClient.create(config)) {
            System.out.println("Successfully connected to FlowMQ!");
            // Your application logic (publishing/consuming) goes here.
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Replace YOUR_API_TOKEN with your actual credentials.

Publishing Messages

To publish messages, you create a Producer from the client. Publishing is an asynchronous operation that returns a CompletableFuture<PublishReceipt>.

Example: Publishing a Message to a Topic

java
import com.flowmq.sdk.*;
import com.flowmq.sdk.auth.Authentication;
import java.util.concurrent.CompletableFuture;

public class PublishExample {
    public static void main(String[] args) throws Exception {
        FlowClientConfig config = FlowClientConfig.builder()
            .brokerAddress("localhost:7070")
            .authentication(Authentication.token("YOUR_API_TOKEN"))
            .build();

        try (FlowClient client = FlowClient.create(config)) {
            // 1. Create a producer
            Producer producer = client.newProducer();

            // 2. Construct a message
            Message message = Message.builder()
                .payload("Hello from Java!".getBytes())
                .header("content-type", "text/plain")
                .build();

            // 3. Publish asynchronously
            CompletableFuture<PublishReceipt> future = producer.publishAsync("sensor.v1.data", message);

            future.whenComplete((receipt, ex) -> {
                if (ex != null) {
                    System.err.println("Failed to publish message: " + ex.getMessage());
                } else {
                    System.out.println("Message published successfully! ID: " + receipt.getMessageId());
                }
            });

            // Block for the example to finish
            future.join();
        }
    }
}

Consuming Messages

You can consume messages from either a Queue or a Stream by providing a message handler as a lambda.

Consuming from a Queue

When consuming from a queue, you must explicitly acknowledge (ack()) each message after successful processing. This signals FlowMQ to remove the message from the queue.

java
import com.flowmq.sdk.*;
import com.flowmq.sdk.auth.Authentication;
import java.util.concurrent.CountDownLatch;

public class QueueConsumerExample {
    public static void main(String[] args) throws InterruptedException {
        FlowClientConfig config = FlowClientConfig.builder()
            .brokerAddress("localhost:7070")
            .authentication(Authentication.token("YOUR_API_TOKEN"))
            .build();

        // Define the queue to consume from
        Queue taskQueue = new Queue("image.processing.tasks");
        
        // Use a latch to keep the main thread alive for the example
        CountDownLatch latch = new CountDownLatch(1);

        try (FlowClient client = FlowClient.create(config)) {
            System.out.println("Starting consumer on queue: " + taskQueue.getName() + "...");
            
            // The consume method runs the handler in the background
            client.consume(taskQueue, (message) -> {
                System.out.println("Received from queue: " + new String(message.getPayload()));
                // Process the message...
                System.out.println("Processing complete. Acknowledging message.");
                message.ack();
                latch.countDown(); // Release latch for this example
            });

            latch.await(); // Wait for a message to be processed
        }
    }
}

Consuming from a Stream

When consuming from a stream, you commit offsets to track your progress. The StartPosition determines where the consumer begins reading in the stream.

java
import com.flowmq.sdk.*;
import com.flowmq.sdk.auth.Authentication;
import com.flowmq.sdk.streams.Stream;
import com.flowmq.sdk.streams.StartPosition;
import java.util.concurrent.CountDownLatch;

public class StreamConsumerExample {
    public static void main(String[] args) throws InterruptedException {
        FlowClientConfig config = FlowClientConfig.builder()
            .brokerAddress("localhost:7070")
            .authentication(Authentication.token("YOUR_API_TOKEN"))
            .build();

        // Define the stream and where to start consuming
        Stream eventStream = new Stream("user.activity.events", StartPosition.EARLIEST);
        CountDownLatch latch = new CountDownLatch(1);

        try (FlowClient client = FlowClient.create(config)) {
            System.out.println("Starting consumer on stream: " + eventStream.getName() + "...");
            
            client.consume(eventStream, (message) -> {
                System.out.println("Received from stream (offset: " + message.getOffset() + "): " + new String(message.getPayload()));
                // Process the message...
                System.out.println("Processing complete. Committing offset.");
                message.commitOffset();
                latch.countDown();
            });

            latch.await();
        }
    }
}

Full End-to-End Example

This example combines publishing and consuming to demonstrate a complete workflow.

java
import com.flowmq.sdk.*;
import com.flowmq.sdk.auth.Authentication;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class FullFlowExample {
    public static void main(String[] args) throws Exception {
        FlowClientConfig config = FlowClientConfig.builder()
            .brokerAddress("localhost:7070")
            .authentication(Authentication.token("YOUR_API_TOKEN"))
            .build();

        try (FlowClient client = FlowClient.create(config)) {
            // 1. Consumer Setup (run in background)
            // (Assumes 'e2e.topic' is routed to 'e2e.queue' in FlowMQ)
            Queue e2eQueue = new Queue("e2e.queue");
            CountDownLatch messageReceivedLatch = new CountDownLatch(1);

            client.consume(e2eQueue, (message) -> {
                System.out.println("Received message: " + new String(message.getPayload()));
                message.ack();
                messageReceivedLatch.countDown();
            });
            System.out.println("Consumer started and waiting for messages...");

            // 2. Producer: Publish a message
            Producer producer = client.newProducer();
            Message message = Message.builder().payload("End-to-End Test Message!".getBytes()).build();
            
            producer.publishAsync("e2e.topic", message).whenComplete((receipt, ex) -> {
                if (ex != null) {
                    System.err.println("Publish failed: " + ex.getMessage());
                } else {
                    System.out.println("Message published successfully. ID: " + receipt.getMessageId());
                }
            }).join(); // Block until publish is complete for simplicity

            // 3. Wait for consumer to receive the message
            if (messageReceivedLatch.await(10, TimeUnit.SECONDS)) {
                System.out.println("Successfully received and acknowledged message.");
            } else {
                System.err.println("Timed out waiting for message.");
            }
        }
    }
}

Additional Resources

  • Error Handling: Chain .exceptionally() calls onto your CompletableFutures to gracefully handle failures.
  • Advanced Configuration: Explore the FlowClientConfig.builder() for options like connection timeouts, request timeouts, and TLS settings.
  • Flow SDK Overview: For a broader understanding of the SDK's architecture and concepts, refer to the Flow SDK Developer Guide.