Start with Kafka and .NET Core, Step-by-Step Guide, Part 2: (consumer)
2 min readJul 9, 2021
In this article I will continue with the previous Kafka blog where we created the Producer. In this part I will write a simple console app that will consume data from the Kafka topic “demo-topic” that is fed with data by our Producer.
1. Create a console project
dotnet new Kafka.Consumer
2. Add the nuget package
dotnet add package Confluent.Kafka
3. Content of Program.cs
namespace Kafka.Consumer
{
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using System.Threading.Tasks;
using System.Threading;
using System;class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
{
collection.AddHostedService<KafkaConsumerHostedService>();
});
}public class KafkaConsumerHostedService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var conf = new ConsumerConfig
{
GroupId = "demo",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Latest,
};using (var builder = new ConsumerBuilder<Ignore, string>(conf).Build())
{
try
{
builder.Subscribe("demo-topic");
var cancelToken = new CancellationTokenSource();
try
{
while (!cancellationToken.IsCancellationRequested)
{
var consumer = builder.Consume(cancelToken.Token);
Console.WriteLine($"Message Recived: {consumer.Message.Value} ");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
builder.Close();
}return Task.CompletedTask;
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
builder.Close();
return Task.CompletedTask;
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
}
4. Run the Producer and Consumer
Enjoy!
dotnet run
Conclusion
Happy coding. GitHub link : https://github.com/JasminSinanovic/start-with-kafka-dotnet/tree/master/part-2
Next chapter : How connect Kafka topic with InfluxDB.