Connect FlowMQ with Kafka .NET Client using KNet
FlowMQ's native support for the Apache Kafka protocol enables .NET developers to use powerful libraries like KNet to build robust applications. KNet is a comprehensive suite for Apache Kafka that provides a complete set of features for producing, consuming, and managing messages. This guide will walk you through using KNet to interact with FlowMQ.
Prerequisites
Before you begin, ensure you have the following:
- A running instance of FlowMQ.
- .NET 6 or later installed.
- A .NET development environment, such as Visual Studio or the .NET CLI.
Installation
You can add KNet to your .NET project using the NuGet package manager.
# Using .NET CLI
dotnet add package KNet
# Or using Package Manager Console
Install-Package KNet
Producer: Connecting and Sending Messages
A producer is used to send messages to a topic in FlowMQ.
Configuration and Connection
First, configure the producer settings, specifying the BootstrapServers
for your FlowMQ instance.
using MASES.KNet.Producer;
using MASES.KNet.Common;
using System.Threading.Tasks;
public class MyProducer
{
private const string BootstrapServers = "localhost:9092";
private const string TopicName = "my-topic";
public static async Task Main(string[] args)
{
var producerConfig = new KNetProducerConfig
{
BootstrapServers = BootstrapServers
};
using var producer = new KNetProducer<string, string>(producerConfig);
// ...
}
}
Producing a Message
To send a message, create a ProducerRecord
and use the Produce
or ProduceAsync
method. You can await the result to ensure the message was successfully sent.
var record = new ProducerRecord<string, string>(TopicName, "my-key", "Hello FlowMQ from .NET with KNet!");
try
{
var result = await producer.ProduceAsync(record);
Console.WriteLine($"Produced message to topic {result.Topic}, partition {result.Partition}, offset {result.Offset}");
}
catch (Exception ex)
{
Console.WriteLine($"Failed to deliver message: {ex.Message}");
}
Consumer: Connecting and Receiving Messages
A consumer subscribes to topics and processes the messages.
Configuration and Connection
Create a consumer instance with the necessary configuration, including BootstrapServers
and a GroupId
.
using MASES.KNet.Consumer;
using System.Threading;
using System;
public class MyConsumer
{
private const string BootstrapServers = "localhost:9092";
private const string GroupId = "my-dotnet-group";
private const string TopicName = "my-topic";
public static void Main(string[] args)
{
var consumerConfig = new KNetConsumerConfig
{
BootstrapServers = BootstrapServers,
GroupId = GroupId,
AutoOffsetReset = KNetConsumerConfig.AutoOffsetResetTypes.Earliest
};
using var consumer = new KNetConsumer<string, string>(consumerConfig);
// ...
}
}
Subscribing and Consuming
Subscribe the consumer to one or more topics and then enter a loop to poll for new records.
consumer.Subscribe(TopicName);
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (!cts.Token.IsCancellationRequested)
{
var records = consumer.Poll(TimeSpan.FromMilliseconds(100));
foreach (var record in records)
{
Console.WriteLine($"Consumed message '{record.Value}' from: '{record.TopicPartitionOffset}'.");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
Additional Resources
- For more detailed documentation and advanced features, visit the official KNet website.
- Explore FlowMQ's advanced features for Kafka integration.